Re: [PR] Fix incorrect Java equals comparison of Uuid by reference [kafka]

2024-04-12 Thread via GitHub


alok123t commented on code in PR #15707:
URL: https://github.com/apache/kafka/pull/15707#discussion_r1563676834


##
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##
@@ -270,7 +270,7 @@ class DefaultAlterPartitionManager(
 
 inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
   val topicId = items.head.topicIdPartition.topicId
-  canUseTopicIds &= topicId != Uuid.ZERO_UUID
+  canUseTopicIds &= !topicId.equals(Uuid.ZERO_UUID)

Review Comment:
   right, we don't need to do this for Scala files, reverted change in 
https://github.com/apache/kafka/pull/15707/commits/e206a2cfdfa955b3ab004b5644e28e6fa23999b9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16511:
-
Description: 
I have some topics there were not written since a few days (having 12h 
retention) where some data remains on tiered storage (in our case S3) and they 
are never deleted.

 

Looking at the log history, it appears that we never even tried to delete these 
segments:

When looking at one of the non-leaking segment, I get the following interesting 
messages:

 
{code:java}
"2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
segment-end-offset: 2976819 and segment-epochs: [5]"
"2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
for completed successfully 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
"2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
"2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
02968418.log to remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}"
"2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
completed successfully, metadata: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
"2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
id=fqGng3UURCG3-v4lETeLKQ}
, startOffset=2968418, endOffset=2976819, brokerId=10029, 
maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
{code}
 

Which looks right because we can see logs from both the plugin and remote log 
manager indicating that the remote log segment was removed.

Now if I look on one of the leaked segment, here is what I see

 
{code:java}
"2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
02971163.log to remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}"
"2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
completed successfully, metadata: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}
, startOffset=2971163, endOffset=2978396, brokerId=10001, 
maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
"2024-04-02T00:43:20.003Z","""kafka""","""10001""","Copying log segment data, 
metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
id=8dP13VDYSaiFlubl9SNBTQ}
, startOffset=2971163, endOffset=2978396, brokerId=10001, 
maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
 
{code}
I have no errors whatsoever indicating that the remote log deletion was 
actually triggered and failed. 

I tried rolling restarting my cluster to see if refreshing 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836762#comment-17836762
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

[~fvisconte] 
The issue might be due to the overlapping remote log segments after a new 
leader gets elected during rolling restart. Would you please upload the past 10 
segments remote-log-segment metadata events for 
5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks!

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=1

Re: [PR] debug for #15489 [kafka]

2024-04-12 Thread via GitHub


chia7712 closed pull request #15654: debug for #15489
URL: https://github.com/apache/kafka/pull/15654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] debug for #15489 [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on PR #15654:
URL: https://github.com/apache/kafka/pull/15654#issuecomment-2053016969

   close as we do see the root cause


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1562683822


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) {
 
 return newArgs.toArray(new String[0]);
 }
+
+private void retryUntilEqual(List expected, Supplier> 
outputSupplier) {
+try {
+TestUtils.waitForCondition(
+() -> expected.equals(outputSupplier.get()),
+"TopicOffsets did not match. Expected: " + 
expectedTestTopicOffsets() + ", but was: "
++ outputSupplier.get() + ". Final offsets: " + 
getFinalOffsets()
+);
+} catch (InterruptedException | ExecutionException e) {
+throw new RuntimeException(e);
+}
+}
+
+private Set getFinalOffsets() throws ExecutionException, 
InterruptedException {

Review Comment:
   we can remove this function now.



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -94,15 +109,38 @@ private void setUp() {
 }
 }
 
+private void createConsumerAndPoll() {
+Properties props = new Properties();
+props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
+props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+try (KafkaConsumer consumer = new 
KafkaConsumer<>(props)) {
+List topics = new ArrayList<>();
+for (int i = 0; i < topicCount + 1; i++) {
+topics.add(getTopicName(i));
+}
+consumer.subscribe(topics);
+consumer.poll(consumerTimeout);
+}
+}
+
 static class Row {
 private String name;
 private int partition;
-private Long timestamp;
+private Long offset;

Review Comment:
   Could you add `final` to those fields?



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) {
 
 return newArgs.toArray(new String[0]);
 }
+
+private void retryUntilEqual(List expected, Supplier> 
outputSupplier) {
+try {
+TestUtils.waitForCondition(

Review Comment:
   we don't need this retry now



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,13 +62,14 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")
 public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
 private final String topicName = "topic";
+private final Duration consumerTimeout = Duration.ofMillis(1000);

Review Comment:
   this can be a local variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix incorrect equals comparison of Uuid by reference [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on code in PR #15707:
URL: https://github.com/apache/kafka/pull/15707#discussion_r1563600417


##
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##
@@ -270,7 +270,7 @@ class DefaultAlterPartitionManager(
 
 inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
   val topicId = items.head.topicIdPartition.topicId
-  canUseTopicIds &= topicId != Uuid.ZERO_UUID
+  canUseTopicIds &= !topicId.equals(Uuid.ZERO_UUID)

Review Comment:
   IIRC, it calls the equals method on the first object in Scala. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:06 PM:
--

The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems like static 
membership.

I can certainly pick this up, but no promises on the expediency ;)


was (Author: JIRAUSER305028):
The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems static.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Fix incorrect equals comparison of Uuid by reference [kafka]

2024-04-12 Thread via GitHub


alok123t commented on PR #15707:
URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052621773

   i did smth similar to https://stackoverflow.com/a/73535824 to find all 
usages in the codebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:03 PM:
--

The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems static.

I can certainly pick this up, but no promises on the expediency ;)


was (Author: JIRAUSER305028):
The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown. I would consider this a bug as well.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750
 ] 

Sal Sorrentino commented on KAFKA-16514:


The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown. I would consider this a bug as well.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]

2024-04-12 Thread via GitHub


alok123t commented on PR #15707:
URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052600875

   @jolshan @chia7712 randomly looking at the code base, i think this is 
prevalent in multiple places. Shall I update them all in this PR and also 
create a ticket?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-12 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be 
set to below local-log-start-offset. I am still trying to understand the impact 
of that. In the common case, the restarted broker can't become the leader or 
serve reads until it's caught up. At that time, the HWM will be up to date. In 
the rare case, the restarted broker is elected as the leader before caught up 
through unclean election. Is this the case that you want to address?
   
   The jira also says:
   
   > If the high watermark is less than the local-log-start-offset, then the 
[UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358)
 method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to 
metadata. Once this error happens, the followers will receive out-of-range 
exceptions and the producers won't be able to produce messages since the leader 
cannot move the high watermark.
   
   However, the follower read is bounded by logEndOffset, not HWM? Where does 
the follower read need to convert HWM to metadata?



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -318,6 +318,80 @@ class UnifiedLogTest {
 assertHighWatermark(4L)
   }
 
+  @Test
+  def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, 
remoteLogStorageEnable = true)
+val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+val leaderEpoch = 0
+
+def assertHighWatermark(offset: Long): Unit = {
+  assertEquals(offset, log.highWatermark)
+  assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+}
+
+// High watermark initialized to 0
+assertHighWatermark(0L)
+
+var offset = 0L
+for(_ <- 0 until 50) {
+  val records = TestUtils.singletonRecords("test".getBytes())
+  val info = log.appendAsLeader(records, leaderEpoch)
+  offset = info.lastOffset
+  if (offset != 0 && offset % 10 == 0)
+log.roll()
+}
+assertEquals(5, log.logSegments.size)
+
+// High watermark not changed by append
+assertHighWatermark(0L)
+
+// Update high watermark as leader
+log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+assertHighWatermark(50L)
+assertEquals(50L, log.logEndOffset)
+
+// Cannot update high watermark past the log end offset
+log.updateHighWatermark(60L)
+assertHighWatermark(50L)
+
+// simulate calls to upload 3 segments to remote storage and remove them 
from local-log.
+log.updateHighestOffsetInRemoteStorage(30)
+log.maybeIncrementLocalLogStartOffset(31L, 
LogStartOffsetIncrementReason.SegmentDeletion)
+log.deleteOldSegments()
+assertEquals(2, log.logSegments.size)
+assertEquals(31L, log.localLogStartOffset())
+assertHighWatermark(50L)
+
+// simulate one remote-log segment deletion
+val logStartOffset = 11L
+log.maybeIncrementLogStartOffset(logStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+assertEquals(11, log.logStartOffset)
+
+// Updating the HW below the log-start-offset / local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+log.updateHighWatermark(new LogOffsetMetadata(5L))
+assertHighWatermark(31L)
+// Updating the HW between log-start-offset and local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.

Review Comment:
   This is moving HW below local-log-start-offset, not log-start-offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrast

Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563199330


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   calling `.get()` on an appendWriteEvent doesn't look right to me. If I 
understand correctly, the `appendWriteEvents` are handled in the quorum 
controller event loop thread.
   
   We would expect `replay()` to also be called in the event loop thread. so if 
we trigger an `appendWriteEvent` and block waiting for the result, it would 
always time out, since we are blocking the processing thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563195256


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   throwing as a throwable seems odd.
   



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   throwing an exception as a throwable seems odd.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563193794


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {

Review Comment:
   we shouldn't use `==` to compare strings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836736#comment-17836736
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


I haven't gone back and re-read the KIP, but IIRC the reason for adding these 
CloseOptions was specific to solving an issue with static membership, hence why 
it only takes affect there.

That said – I completely agree that there's no reason why this should only work 
with static membership, and the decision to not leave the group for 
non-static-membership is one of those biases that Kafka Streams has in assuming 
persistent state stores.

I would fully support changing the behavior to work with non-static-members 
rather than just updating the javadocs to explain this. [~mjsax] would this 
need a KIP? Or can we just consider this a "bug" (especially since the javadocs 
make no mention that it's intended to only work on static members) and since we 
don't need any API changes, simply make the change without a KIP?

Either way – [~sal.sorrentino] would you be interested in picking this up?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions

2024-04-12 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16545:
--

 Summary: Auto adjust the replica factor according to number of 
broker when using ClusterTestExtensions
 Key: KAFKA-16545
 URL: https://issues.apache.org/jira/browse/KAFKA-16545
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


In most test cases, we start single broker so as to save resources. However, it 
could causes error when creating internal topics since they require 3 replicas 
by default. In order to reducing the duplicate configs from all tests, we can 
add a bit sugar to auto adjust the replica factor (if it is not defined by 
tests) when the number of brokers started by tests is less then default value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-04-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16544:
--

Assignee: Chia-Ping Tsai

> DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames 
> should return null instead of throwing NPE
> --
>
> Key: KAFKA-16544
> URL: https://issues.apache.org/jira/browse/KAFKA-16544
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
>  * @return A future map from topic names to descriptions which can be 
> used to check
>  * the status of individual description if the describe topic 
> request used
>  * topic names, otherwise return null, this request succeeds only 
> if all the
>  * topic descriptions succeed
> {code}
> According the docs, it should return null if we try to get the result 
> unmatched to the request. For example, we call `allTopicNames` in passing 
> `TopicIdCollection`. However, the current implementation will throw NPE 
> directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-04-12 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16544:
--

 Summary: DescribeTopicsResult#allTopicIds and 
DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
 Key: KAFKA-16544
 URL: https://issues.apache.org/jira/browse/KAFKA-16544
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


{code:java}
 * @return A future map from topic names to descriptions which can be used 
to check
 * the status of individual description if the describe topic 
request used
 * topic names, otherwise return null, this request succeeds only 
if all the
 * topic descriptions succeed
{code}

According the docs, it should return null if we try to get the result unmatched 
to the request. For example, we call `allTopicNames` in passing 
`TopicIdCollection`. However, the current implementation will throw NPE directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on code in PR #15705:
URL: https://github.com/apache/kafka/pull/15705#discussion_r1562938471


##
clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java:
##
@@ -37,13 +37,13 @@ public class GarbageCollectedMemoryPool extends 
SimpleMemoryPool implements Auto
 //serves 2 purposes - 1st it maintains the ref objects reachable (which is 
a requirement for them
 //to ever be enqueued), 2nd keeps some (small) metadata for every buffer 
allocated
 private final Map buffersInFlight = new 
ConcurrentHashMap<>();
-private final GarbageCollectionListener gcListener = new 
GarbageCollectionListener();
 private final Thread gcListenerThread;
-private volatile boolean alive = true;
+private volatile boolean alive;
 
 public GarbageCollectedMemoryPool(long sizeBytes, int 
maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
 super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
 this.alive = true;

Review Comment:
   it seems to me removing `this.alive = true;` is more suitable.



##
clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java:
##
@@ -79,12 +79,8 @@ public ByteBuffer get(int size) {
 @Override
 public void release(ByteBuffer buffer) {
 buffer.clear();
-Deque bufferQueue = bufferMap.get(buffer.capacity());
-if (bufferQueue == null) {
-// We currently keep a single buffer in flight, so optimise 
for that case
-bufferQueue = new ArrayDeque<>(1);
-bufferMap.put(buffer.capacity(), bufferQueue);
-}
+Deque bufferQueue = 
bufferMap.computeIfAbsent(buffer.capacity(), k -> new ArrayDeque<>(1));
+// We currently keep a single buffer in flight, so optimise for 
that case

Review Comment:
   Can we move this comment up?



##
clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java:
##
@@ -60,7 +58,6 @@ public SslChannelBuilder(Mode mode,
 this.mode = mode;
 this.listenerName = listenerName;
 this.isInterBrokerListener = isInterBrokerListener;
-this.log = logContext.logger(getClass());

Review Comment:
   maybe we should pass it to `Utils.closeQuietly` instead of deleting it.



##
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java:
##
@@ -136,8 +136,8 @@ private CoordinatorKey 
requireSingletonAndType(Set keys) {
 }
 
 private void ensureSameType(Set keys) {
-if (keys.size() < 1) {
-throw new IllegalArgumentException("Unexpected size of key set: 
expected >= 1, but got " + keys.size());
+if (keys.isEmpty()) {
+throw new IllegalArgumentException("Unexpected size of key set: 
expected >= 1, but got 0");

Review Comment:
   maybe we can say "the keys can't be empty"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-12 Thread via GitHub


junrao commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2052208460

   @kamalcph : Thanks for the update. JDK 11 and Scala 2.13 didn't complete in 
the last test run. Could you trigger another run of the tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836717#comment-17836717
 ] 

Jun Rao edited comment on KAFKA-16541 at 4/12/24 5:43 PM:
--

Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change LeaderEpochFileCache.truncateFromEnd and 
LeaderEpochFileCache.truncateFromStart to only write to memory without writing 
to the checkpoint file, (2) change the implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.


was (Author: junrao):
Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change
LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart 
to only write to memory without writing to the checkpoint file, (2) change the 
implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836717#comment-17836717
 ] 

Jun Rao commented on KAFKA-16541:
-

Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change
LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart 
to only write to memory without writing to the checkpoint file, (2) change the 
implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-16541:

Affects Version/s: 3.7.0

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2024-04-12 Thread via GitHub


junrao commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1562928115


##
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##
@@ -72,18 +72,20 @@ public CheckpointFile(File file,
 tempPath = Paths.get(absolutePath + ".tmp");
 }
 
-public void write(Collection entries) throws IOException {
+public void write(Collection entries, boolean sync) throws IOException {
 synchronized (lock) {
 // write to temp file and then swap with the existing file
 try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
  BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
 CheckpointWriteBuffer checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);
 checkpointWriteBuffer.write(entries);
 writer.flush();
-fileOutputStream.getFD().sync();
+if (sync) {
+fileOutputStream.getFD().sync();

Review Comment:
   Thanks, @ocadaruma !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on PR #15707:
URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052183089

   the following links have similar issue. Should we fix them in this PR or I 
file a follow-up to fix them?
   
   
https://github.com/apache/kafka/blob/61baa7ac6bb871797197d9289a848a0d4f587ced/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1704
   
   
https://github.com/apache/kafka/blob/e02ffd852fae7c1d2681621be7eb888e3805e027/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-12 Thread via GitHub


cmccabe merged PR #15648:
URL: https://github.com/apache/kafka/pull/15648


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in server and server-common [kafka]

2024-04-12 Thread via GitHub


chia7712 commented on code in PR #15710:
URL: https://github.com/apache/kafka/pull/15710#discussion_r1562828732


##
server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java:
##
@@ -41,9 +42,7 @@ public void testEmptyRegistry() {
 private static void assertIteratorContains(Iterator iter,
Snapshot... snapshots) {
 List expected = new ArrayList<>();

Review Comment:
   How about `List expected = Arrays.asList(snapshots);`?



##
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java:
##
@@ -116,8 +116,7 @@ public synchronized boolean 
maybeUpdatePushRequestTimestamp(long currentTime) {
 */
 boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp;

Review Comment:
   how about `boolean canAccept = lastGetRequestTimestamp > 
lastPushRequestTimestamp || currentTime - lastPushRequestTimestamp >= 
pushIntervalMs;`?



##
server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java:
##
@@ -310,10 +309,8 @@ private static void assertIteratorYields(Iterator iter,
 }
 }
 if (!extraObjects.isEmpty() || !remaining.isEmpty()) {
-throw new RuntimeException("Found extra object(s): [" + 
String.join(", ",
-extraObjects.stream().map(e -> 
e.toString()).collect(Collectors.toList())) +
-"] and didn't find object(s): [" + String.join(", ",
-remaining.keySet().stream().map(e -> 
e.toString()).collect(Collectors.toList())) + "]");
+throw new RuntimeException("Found extra object(s): [" + 
extraObjects.stream().map(e -> e.toString()).collect(Collectors.joining(", ")) +

Review Comment:
   `e -> e.toString()` -> `Object::toString`



##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -321,7 +321,7 @@ public void run() throws Exception {
 AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
 
 Set failed = filterFailures(data, inflight);
-Set completed = Utils.diff(HashSet::new, 
inflight.values().stream().collect(Collectors.toSet()), failed);
+Set completed = Utils.diff(HashSet::new, new 
HashSet<>(inflight.values()), failed);

Review Comment:
   Maybe we don't need to create collection many times by skipping the failed 
elements.
   ```java
   Set failed = filterFailures(data, inflight);
   for (AssignmentEvent assignmentEvent : inflight.values()) {
   if (failed.contains(assignmentEvent)) continue;
   if (log.isDebugEnabled()) {
   log.debug("Successfully propagated assignment {}", 
assignmentEvent);
   }
   assignmentEvent.onComplete();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-04-12 Thread via GitHub


kirktrue commented on PR #15408:
URL: https://github.com/apache/kafka/pull/15408#issuecomment-2052100630

   @lucasbru—I think it's worth adding the tests, even if we already know there 
are more to be added in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-12 Thread via GitHub


AyoubOm opened a new pull request, #15713:
URL: https://github.com/apache/kafka/pull/15713

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-12 Thread via GitHub


chia7712 merged PR #15670:
URL: https://github.com/apache/kafka/pull/15670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836671#comment-17836671
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

CloseOption was introduced via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]

The reasoning about the design should be on the KIP and corresponding DISCUSS 
thread.

I agree that the JavaDocs are missing a lot of information. Would you be 
interested to do a PR to improve the JavaDocs?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Various cleanups in shell [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15712:
URL: https://github.com/apache/kafka/pull/15712

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


CalvinConfluent commented on PR #15702:
URL: https://github.com/apache/kafka/pull/15702#issuecomment-2052041676

   @mumrah Can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in storage [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15711:
URL: https://github.com/apache/kafka/pull/15711

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Various cleanups in server and server-common [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15710:
URL: https://github.com/apache/kafka/pull/15710

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in tools [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15709:
URL: https://github.com/apache/kafka/pull/15709

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Various cleanups in trogdor [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15708:
URL: https://github.com/apache/kafka/pull/15708

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836644#comment-17836644
 ] 

Sal Sorrentino commented on KAFKA-16514:


Digging further into this, it seems the leaveGroup option is only supported if 
the "group.instance.id" is supplied via the StreamsConfig, however there is not 
documentation around this. The "group.instance.id" has no representation in the 
StreamsConfig class, event though it is intended to be a user supplied 
configuration providing static membership.

It is unclear to me why this flag is only supported for static membership since 
a consumer group member can "leave" a group at any point in time without static 
membership.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15359) Support log.dir.failure.timeout.ms configuration for JBOD

2024-04-12 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15359:
---

Assignee: (was: Igor Soarez)

> Support log.dir.failure.timeout.ms configuration for JBOD
> -
>
> Key: KAFKA-15359
> URL: https://issues.apache.org/jira/browse/KAFKA-15359
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Major
>
> If the Broker repeatedly cannot communicate fails to communicate a log 
> directory failure after a configurable amount of time — 
> {{log.dir.failure.timeout.ms}} — and it is the leader for any replicas in the 
> failed log directory the broker will shutdown, as that is the only other way 
> to guarantee that the controller will elect a new leader for those partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-12 Thread via GitHub


soarez commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -507,6 +522,7 @@ class BrokerLifecycleManager(
 if (errorCode == Errors.NONE) {
   val responseData = message.data()
   failedAttempts = 0
+  offlineDirs = offlineDirs.map(kv => kv._1 -> true)

Review Comment:
   I think this is incorrect. If a new failed directory is added to 
`offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here 
before knowing if it will propagated to the controller.
   
   One idea is to hand down the offline dirs set in the request in 
`sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through 
`BrokerHeartbeatResponseHandler` as a new constructor argument.



##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -94,6 +94,7 @@ public class Defaults {
 public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 
6;
 public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
 public static final boolean AUTO_CREATE_TOPICS_ENABLE = true;
+public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 3L;

Review Comment:
   This default seems reasonable to me.



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -211,7 +211,8 @@ class BrokerServer(
 time,
 s"broker-${config.nodeId}-",
 isZkBroker = false,
-logDirs = logManager.directoryIdsSet)
+logDirs = logManager.directoryIdsSet,
+() => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1))

Review Comment:
   There's a `scheduleOnce` alternative which sets `periodMs` to `-1`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -528,6 +529,10 @@ object KafkaConfig {
 "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
 "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
+  val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully 
communicate to the controller that some log " +
+"directory has failed for longer than this time, and there's at least one 
partition with leadership on that directory, " +

Review Comment:
   > and there's at least one partition with leadership
   
   We aren't checking for this condition. We can either a) implement it; or b) 
keep it simple and drop this out of the configuration description.



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -327,16 +333,25 @@ class BrokerLifecycleManager(
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
 override def run(): Unit = {
   if (offlineDirs.isEmpty) {
-offlineDirs = Set(dir)
+offlineDirs = Map(dir -> false)
   } else {
-offlineDirs = offlineDirs + dir
+offlineDirs += (dir -> false)
   }
   if (registered) {
 scheduleNextCommunicationImmediately()
   }
 }
   }
 
+  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
+override def run(): Unit = {
+  if (!offlineDirs.getOrElse(offlineDir, false)) {
+error(s"Shutting down because couldn't communicate offline log dirs 
with controllers")

Review Comment:
   We should include the directory in the error. It might also be helpful to 
resolve the directory ID to its path. Perhaps something like `dirIdToPath` in 
`AssignmentsManager` should be made available here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-04-12 Thread via GitHub


dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2051891235

   Sorry for the delay on this one. I will review it next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]

2024-04-12 Thread via GitHub


alok123t opened a new pull request, #15707:
URL: https://github.com/apache/kafka/pull/15707

   Previously, we were comparing uuid by reference, which could be incorrect. 
This PR replaces to do the comparison by value instead
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]

2024-04-12 Thread via GitHub


dajac commented on PR #15706:
URL: https://github.com/apache/kafka/pull/15706#issuecomment-2051869550

   Hi @hudeqi. Thanks for the patch. I would like to better understand it. My 
first question is how would Flink commit Flink with a generationId equal to -1? 
The generation of the group is only managed by the group. It is not possible to 
alter it from an external system. The -1 passed in the offset commit request is 
only used for validation purposes.
   
   The reason why we don't write a tombstone in this case is because the group 
was never materialized in the log if it stayed at generation 0. I am not sure 
it is a worthwhile optimization though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]

2024-04-12 Thread via GitHub


hudeqi opened a new pull request, #15706:
URL: https://github.com/apache/kafka/pull/15706

   In the `cleanupGroupMetadata` method, tombstone messages is written to 
delete the group's MetadataKey only when the group is in the Dead state and the 
generation is greater than 0. The comment indicates: 'We avoid writing the 
tombstone when the generationId is 0, since this group is only using Kafka for 
offset storage.' This means that groups that only use Kafka for offset storage 
should not be deleted. However, there is a situation where, for example, Flink 
commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is 
called to delete this group, Flink's group metadata will never be deleted. Yet, 
the logic above has already cleaned up commitKey by writing tombstone messages 
with `removedOffsets`. Therefore, the actual manifestation is: the group no 
longer exists (since the offsets have been cleaned up, there is no possibility 
of adding the group back to the `groupMetadataCache` unless offsets are 
committed again with the same group name), but the corresponding g
 roup metadata information still exists in __consumer_offsets. This leads to 
the problem that deleting the group does not completely clean up its related 
information.
   
   The group's state is set to Dead only in the following three situations:
   1. The group information is unloaded
   2. The group is deleted by ApiKeys.DELETE_GROUPS
   3. All offsets of the group have expired or removed.
   
   Therefore, since the group is already in the Dead state and has been removed 
from the `groupMetadataCache`, why not directly clean up all the information of 
the group? Even if it is only used for storing offsets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16543) There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0

2024-04-12 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-16543:
---
Description: 
In the `cleanupGroupMetadata` method, tombstone messages is written to delete 
the group's MetadataKey only when the group is in the Dead state and the 
generation is greater than 0. The comment indicates: 'We avoid writing the 
tombstone when the generationId is 0, since this group is only using Kafka for 
offset storage.' This means that groups that only use Kafka for offset storage 
should not be deleted. However, there is a situation where, for example, Flink 
commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is 
called to delete this group, Flink's group metadata will never be deleted. Yet, 
the logic above has already cleaned up commitKey by writing tombstone messages 
with `removedOffsets`. Therefore, the actual manifestation is: the group no 
longer exists (since the offsets have been cleaned up, there is no possibility 
of adding the group back to the `groupMetadataCache` unless offsets are 
committed again with the same group name), but the corresponding group metadata 
information still exists in __consumer_offsets. This leads to the problem that 
deleting the group does not completely clean up its related information.

The group's state is set to Dead only in the following three situations:
1. The group information is unloaded
2. The group is deleted by ApiKeys.DELETE_GROUPS
3. All offsets of the group have expired or removed.

Therefore, since the group is already in the Dead state and has been removed 
from the `groupMetadataCache`, why not directly clean up all the information of 
the group? Even if it is only used for storing offsets.

  was:
In the `cleanupGroupMetadata` method, tombstone messages is written to delete 
the group's MetadataKey only when the group is in the Dead state and the 
generation is greater than 0. The comment indicates: 'We avoid writing the 
tombstone when the generationId is 0, since this group is only using Kafka for 
offset storage.' This means that groups that only use Kafka for offset storage 
should not be deleted. However, there is a situation where, for example, Flink 
commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is 
called to delete this group, Flink's group metadata will never be deleted. Yet, 
the logic above has already cleaned up commitKey by writing tombstone messages 
with removedOffsets. Therefore, the actual manifestation is: the group no 
longer exists (since the offsets have been cleaned up, there is no possibility 
of adding the group back to the `groupMetadataCache` unless offsets are 
committed again with the same group name), but the corresponding group metadata 
information still exists in __consumer_offsets. This leads to the problem that 
deleting the group does not completely clean up its related information.

The group's state is set to Dead only in the following three situations:
1. The group information is unloaded
2. The group is deleted by ApiKeys.DELETE_GROUPS
3. All offsets of the group have expired or removed.

Therefore, since the group is already in the Dead state and has been removed 
from the `groupMetadataCache`, why not directly clean up all the information of 
the group? Even if it is only used for storing offsets.


> There may be ambiguous deletions in the `cleanupGroupMetadata` when the 
> generation of the group is less than or equal to 0
> --
>
> Key: KAFKA-16543
> URL: https://issues.apache.org/jira/browse/KAFKA-16543
> Project: Kafka
>  Issue Type: Bug
>  Components: group-coordinator
>Affects Versions: 3.6.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> In the `cleanupGroupMetadata` method, tombstone messages is written to delete 
> the group's MetadataKey only when the group is in the Dead state and the 
> generation is greater than 0. The comment indicates: 'We avoid writing the 
> tombstone when the generationId is 0, since this group is only using Kafka 
> for offset storage.' This means that groups that only use Kafka for offset 
> storage should not be deleted. However, there is a situation where, for 
> example, Flink commit offsets with a generationId equal to -1. If the 
> ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata 
> will never be deleted. Yet, the logic above has already cleaned up commitKey 
> by writing tombstone messages with `removedOffsets`. Therefore, the actual 
> manifestation is: the group no longer exists (since the offsets have been 
> cleaned up, there is no possibility of adding the group back to the 
> `groupMetadataCache` unless offsets are committed again with the same gr

[jira] [Created] (KAFKA-16543) There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0

2024-04-12 Thread hudeqi (Jira)
hudeqi created KAFKA-16543:
--

 Summary: There may be ambiguous deletions in the 
`cleanupGroupMetadata` when the generation of the group is less than or equal 
to 0
 Key: KAFKA-16543
 URL: https://issues.apache.org/jira/browse/KAFKA-16543
 Project: Kafka
  Issue Type: Bug
  Components: group-coordinator
Affects Versions: 3.6.2
Reporter: hudeqi
Assignee: hudeqi


In the `cleanupGroupMetadata` method, tombstone messages is written to delete 
the group's MetadataKey only when the group is in the Dead state and the 
generation is greater than 0. The comment indicates: 'We avoid writing the 
tombstone when the generationId is 0, since this group is only using Kafka for 
offset storage.' This means that groups that only use Kafka for offset storage 
should not be deleted. However, there is a situation where, for example, Flink 
commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is 
called to delete this group, Flink's group metadata will never be deleted. Yet, 
the logic above has already cleaned up commitKey by writing tombstone messages 
with removedOffsets. Therefore, the actual manifestation is: the group no 
longer exists (since the offsets have been cleaned up, there is no possibility 
of adding the group back to the `groupMetadataCache` unless offsets are 
committed again with the same group name), but the corresponding group metadata 
information still exists in __consumer_offsets. This leads to the problem that 
deleting the group does not completely clean up its related information.

The group's state is set to Dead only in the following three situations:
1. The group information is unloaded
2. The group is deleted by ApiKeys.DELETE_GROUPS
3. All offsets of the group have expired or removed.

Therefore, since the group is already in the Dead state and has been removed 
from the `groupMetadataCache`, why not directly clean up all the information of 
the group? Even if it is only used for storing offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-12 Thread via GitHub


chiacyu commented on code in PR #15678:
URL: https://github.com/apache/kafka/pull/15678#discussion_r1562574967


##
server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java:
##
@@ -172,10 +172,11 @@ public void testAssignmentAggregation() throws 
InterruptedException {
 manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, () -> { 
});
 manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, () -> { 
});
 manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, () -> { 
});
-while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+TestUtils.waitForCondition(() -> {
 time.sleep(100);
 manager.wakeup();
-}
+return readyToAssert.await(1, TimeUnit.MILLISECONDS);
+}, 1000, "Wait for ready to assert.");

Review Comment:
   Great comments, just applied. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-12 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1562500382


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   Do we really need this comment?
   Additionally, I could not find a verification of this call in unit tests. 
Since you added a comment it seems to be important enough for a verification. 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOnInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");
+
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
+"interval expires if there is a previous HB request in-flight");
+
+// Receive response for the inflight. The next HB should be sent on 
the next poll after
+// the interval expires.
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+time.sleep(DEFAULT_RETRY_BACKOFF_MS);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+

Review Comment:
   ```suggestion
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOnInFlight() {

Review Comment:
   typo
   ```suggestion
   public void testHeartbeatNotSentIfAnotherOneInFlight() {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   What about adding a method to `Timer` that expires the timer without 
updating it with a time point in the future. Alternatively, I think you could 
reset the `Timer` to 0 with `heartbeatTimer.reset(0)`.
   Do we need a verification in the unit tests for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in clients [kafka]

2024-04-12 Thread via GitHub


mimaison opened a new pull request, #15705:
URL: https://github.com/apache/kafka/pull/15705

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch [kafka]

2024-04-12 Thread via GitHub


showuon commented on PR #14778:
URL: https://github.com/apache/kafka/pull/14778#issuecomment-2051465918

   Correction: For this:
   > some users might feel surprised when their fetch doesn't respond in 
fetch.max.wait.ms time.
   
   This is wrong. Even if the remote reading is not completed, yet, the fetch 
request will still return in `fetch.max.wait.ms`. It's just an empty response.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836529#comment-17836529
 ] 

Luke Chen commented on KAFKA-16510:
---

OK, I see. If it's the issue of incorrectly SASL_SSL setting, then I think it's 
a known issue. Thanks for the reply.

> java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-16510
> URL: https://issues.apache.org/jira/browse/KAFKA-16510
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Hiro
>Priority: Major
>
> kafka-metadata-quorum is not available in SASL_PLAIN.
> I got this error, I only use SASL_PLAIN. not use SSL.
> I found a person with a similar situation, but he is using mTLS.
> https://issues.apache.org/jira/browse/KAFKA-16006
> {code:java}
> sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server  ip>:9093 --command-config controller-admin.properties  describe --replication
> [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
> ‘kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
> at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
> at java.base/java.lang.Thread.run(Thread.java:840)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1562307197


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   I agree option (b) is better. But for this:
   > We can still run into trouble if the directory with the main replica is 
offline. At some point that will cause a crash if the directory ever comes back 
online. But there's nothing we can do about that here. Maybe future work could 
improve how the broker handles loading conflicting logs.
   
   I believe this PR: https://github.com/apache/kafka/pull/15335 should already 
fix this issue. @OmniaGM , could you help confirm it? If not, I think we should 
not promote the future log when main replica is offline to cause potential 
future issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-04-12 Thread via GitHub


VedarthConfluent commented on code in PR #15704:
URL: https://github.com/apache/kafka/pull/15704#discussion_r1562289692


##
docker/generate_kafka_pr_template.sh:
##
@@ -0,0 +1,61 @@
+#!/usr/bin/env bash

Review Comment:
   Add license string at the beginning of all new files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1562284319


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   I can see we basically do the similar things as what 
`replaceCurrentWithFutureLog` did, why should we duplicate the codes here 
again? Could we not be able to call `replaceCurrentWithFutureLog` with 
additional parameters to indicate we want to skip some actions?
   
   Like what we did now, we didn't close the `currentLog`, which should have 
potential resource leak. Using the `replaceCurrentWithFutureLog` could avoid 
this issue happening. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-12 Thread Hiro (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836517#comment-17836517
 ] 

Hiro commented on KAFKA-16510:
--

[~showuon]
No, the problem occurred with all scripts.
When I tried it yesterday, I thought the problem was resolved by changing the 
heap size, but now the same problem occurs even if I set KAFKA_HEAP_OPTS.
Currently, this problem can be reproduced by incorrectly SASL_SSL setting or 
not setting around SASL_SSL configs in --command-config.
Thanks.

{code}
sh-4.2$  /opt/kafka/bin/kafka-topics.sh --bootstrap-server :9093 
--list
Error while executing topic command : The AdminClient thread has exited. Call: 
listTopics
[2024-04-12 09:00:55,990] ERROR 
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
exited. Call: listTopics
 (org.apache.kafka.tools.TopicCommand)
[2024-04-12 09:00:55,997] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
at 
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}


> java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-16510
> URL: https://issues.apache.org/jira/browse/KAFKA-16510
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Hiro
>Priority: Major
>
> kafka-metadata-quorum is not available in SASL_PLAIN.
> I got this error, I only use SASL_PLAIN. not use SSL.
> I found a person with a similar situation, but he is using mTLS.
> https://issues.apache.org/jira/browse/KAFKA-16006
> {code:java}
> sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server  ip>:9093 --command-config controller-admin.properties  describe --replication
> [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
> ‘kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
> at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
> at java.base/java.lang.Thread.run(Thread.java:840)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> at 
> java.base/java.util.concurrent.CompletableFut

[jira] [Updated] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-12 Thread Hiro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hiro updated KAFKA-16510:
-
Affects Version/s: 3.7.0
   (was: 3.4.1)

> java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-16510
> URL: https://issues.apache.org/jira/browse/KAFKA-16510
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Hiro
>Priority: Major
>
> kafka-metadata-quorum is not available in SASL_PLAIN.
> I got this error, I only use SASL_PLAIN. not use SSL.
> I found a person with a similar situation, but he is using mTLS.
> https://issues.apache.org/jira/browse/KAFKA-16006
> {code:java}
> sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server  ip>:9093 --command-config controller-admin.properties  describe --replication
> [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
> ‘kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
> at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
> at java.base/java.lang.Thread.run(Thread.java:840)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836506#comment-17836506
 ] 

Walter Hernandez edited comment on KAFKA-2499 at 4/12/24 8:45 AM:
--

This is indeed duplicated by the referenced earlier ticket, where there is a 
patch available; however, the patch needs a rebase (goes back to v0.9.0.1).

In addition, the changes broke some system tests that were not resolved.

With the referenced ticket not having any activity since 2018 and already has 
an associated PR, I would like to take on this effort for the latest version.


was (Author: JIRAUSER305029):
This is indeed duplicated by the referenced earlier ticket, where there is a 
patch available; however, the patch needs a rebase (goes back to v 1.0.0 ).

 

In addition, the changes broke some system tests that were not resolved.

 

With the referenced ticket not having any activity since 2018 and already has 
an associated PR, I would like to take on this effort for the latest version.

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Assignee: Walter Hernandez
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836506#comment-17836506
 ] 

Walter Hernandez commented on KAFKA-2499:
-

This is indeed duplicated by the referenced earlier ticket, where there is a 
patch available; however, the patch needs a rebase (goes back to v 1.0.0 ).

 

In addition, the changes broke some system tests that were not resolved.

 

With the referenced ticket not having any activity since 2018 and already has 
an associated PR, I would like to take on this effort for the latest version.

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ben Stopford
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez updated KAFKA-2499:

Issue Type: Improvement  (was: Bug)

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-2499:
---

Assignee: Walter Hernandez

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Assignee: Walter Hernandez
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-2499 ]


Walter Hernandez deleted comment on KAFKA-2499:
-

was (Author: JIRAUSER305029):
This indeed duplicates the referenced earlier ticket, where there is a patch 
available.

However, this patch needs a rebase (goes back to v2.1.0).

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ben Stopford
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays

2024-04-12 Thread Walter Hernandez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836503#comment-17836503
 ] 

Walter Hernandez commented on KAFKA-2499:
-

This indeed duplicates the referenced earlier ticket, where there is a patch 
available.

However, this patch needs a rebase (goes back to v2.1.0).

> kafka-producer-perf-test should use something more realistic than empty byte 
> arrays
> ---
>
> Key: KAFKA-2499
> URL: https://issues.apache.org/jira/browse/KAFKA-2499
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ben Stopford
>Priority: Major
>  Labels: newbie
>
> ProducerPerformance.scala (There are two of these, one used by the shell 
> script and one used by the system tests. Both exhibit this problem)
> creates messags from empty byte arrays. 
> This is likely to provide unrealistically fast compression and hence 
> unrealistically fast results. 
> Suggest randomised bytes or more realistic sample messages are used. 
> Thanks to Prabhjot Bharaj for reporting this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836501#comment-17836501
 ] 

Luke Chen commented on KAFKA-16510:
---

[~hiroa] , I'd like to confirm that only `kafka-metadata-quorum.sh` script 
causes OOM issue, not other scripts, like kafka-topics.sh, kafka-configs.sh. Is 
that right?

> java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-16510
> URL: https://issues.apache.org/jira/browse/KAFKA-16510
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.4.1
>Reporter: Hiro
>Priority: Major
>
> kafka-metadata-quorum is not available in SASL_PLAIN.
> I got this error, I only use SASL_PLAIN. not use SSL.
> I found a person with a similar situation, but he is using mTLS.
> https://issues.apache.org/jira/browse/KAFKA-16006
> {code:java}
> sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server  ip>:9093 --command-config controller-admin.properties  describe --replication
> [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
> ‘kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
> at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
> at java.base/java.lang.Thread.run(Thread.java:840)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836472#comment-17836472
 ] 

Francois Visconte edited comment on KAFKA-16511 at 4/12/24 8:24 AM:


I ran kafka-delete-records.sh and it did the trick


{code:java}
[RemoteLogManager=10015 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log 
segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, 
id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current 
earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), 
segment-end-offset: 2978396 and segment-epochs: [7]
{code}

Do you happen to know what triggers the issue?



was (Author: JIRAUSER288982):
I ran kafka-delete-records.sh and it did the trick


{code:java}
[RemoteLogManager=10015 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log 
segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, 
id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current 
earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), 
segment-end-offset: 2978396 and segment-epochs: [7]
{code}


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remo

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836472#comment-17836472
 ] 

Francois Visconte commented on KAFKA-16511:
---

I ran kafka-delete-records.sh and it did the trick


{code:java}
[RemoteLogManager=10015 
partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log 
segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, 
id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current 
earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), 
segment-end-offset: 2978396 and segment-epochs: [7]
{code}


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegme

Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1562184559


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   Does the file name always ends with `.delete`? Should we check it before 
deletion?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");

Review Comment:
   1. Why does it contain `delete` in the end?
   2. Unfortunately, the topic name could contain `-` or `.`, so it's unsafe to 
do regex like this.
   
   I'm thinking we can pass `topicPartition` as parameter into 
`deleteTypeIfExists` so that we don't have to do further regex like this. And 
just verify if fileName.endsWith("future") because the normal folder name 
should always ends with a number (partition number), instead of "future". WDYT?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {
+LOGGER.warn("Fallback to delete {} {}.", fileType, 
fallbackFile.getAbsolutePath());

Review Comment:
   Why did we use `warn` here? I think we can use `info` since it's expected 
behavior. WDYT?



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRe

[jira] [Assigned] (KAFKA-16522) Admin client changes for adding and removing voters

2024-04-12 Thread Quoc Phong Dang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quoc Phong Dang reassigned KAFKA-16522:
---

Assignee: Quoc Phong Dang

> Admin client changes for adding and removing voters
> ---
>
> Key: KAFKA-16522
> URL: https://issues.apache.org/jira/browse/KAFKA-16522
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Assignee: Quoc Phong Dang
>Priority: Major
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Admin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while

2024-04-12 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836455#comment-17836455
 ] 

zhangzhisheng commented on KAFKA-13636:
---

Kafka broker version is 2.7.2
Client version is 2.5.2.
auto.offset.reset  is earliest

there have been instances where offset messages were deleted on the consumer 
side.
{code:java}
// code placeholder
org.apache.kafka.clients.consumer.internals.SubscriptionState 
SubscriptionState.java:397 [trace=,span=] - [Consumer 
clientId=app1_a2acc55fc0bbac81, groupId=gid-app1] Resetting offset for 
partition topic-4 to offset 3577938321.
 {code}

> Committed offsets could be deleted during a rebalance if a group did not 
> commit for a while
> ---
>
> Key: KAFKA-13636
> URL: https://issues.apache.org/jira/browse/KAFKA-13636
> Project: Kafka
>  Issue Type: Bug
>  Components: core, offset manager
>Affects Versions: 2.4.0, 2.5.1, 2.6.2, 2.7.2, 2.8.1, 3.0.0
>Reporter: Damien Gasparina
>Assignee: Prince Mahajan
>Priority: Major
> Fix For: 3.0.1, 2.8.2, 3.2.0, 3.1.1
>
>
> The group coordinator might delete invalid offsets during a group rebalance. 
> During a rebalance, the coordinator is relying on the last commit timestamp 
> ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired 
> offsets.
>  
> This is relatively easy to reproduce by playing with 
> group.initial.rebalance.delay.ms, offset.retention.minutes and 
> offset.check.retention.interval, I uploaded an example on: 
> [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .
> This script does:
>  * Start a broker with: offset.retention.minute=2, 
> o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
>   group.initial.rebalance.delay=2
>  * Produced 10 messages
>  * Create a consumer group to consume 10 messages, and disable auto.commit to 
> only commit a few times
>  * Wait 3 minutes, then the Consumer get a {{kill -9}}
>  * Restart the consumer after a few seconds
>  * The consumer restart from {{auto.offset.reset}} , the offset got removed
>  
> The cause is due to the GroupMetadata.scala:
>  * When the group get emptied, the {{subscribedTopics}} is set to 
> {{Set.empty}} 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
>  * When the new member joins, we add the new member right away in the group ; 
> BUT the {{subscribedTopics}} is only updated once the migration is over (in 
> the initNewGeneration) (which could take a while due to the 
> {{{}group.initial.rebalance.delay{}}})
>  * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
> true as {{Set.empty != None}} (the underlying condition)
>  * Thus we enter 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
>  with an empty {{subscribedTopics}} list and we are relying on the 
> {{commitTimestamp}} regardless of the {{currentStateTimestamp}}
>  
> This seem to be a regression generated by KIP-496 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
>  (KAFKA-8338, KAFKA-8370)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-12 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836444#comment-17836444
 ] 

Kamal Chandraprakash commented on KAFKA-16511:
--

The segment deletion might be stuck due to 
[RemoteLogManager#isRemoteSegmentWithinLeaderEpochs|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1241]
 check. The {{log-start-offset}} for this partition 765 might be moved using 
the {{kafka-delete-records.sh}} script so the check fails to mark it as valid 
segment.

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3

[PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-04-12 Thread via GitHub


KrishVora01 opened a new pull request, #15704:
URL: https://github.com/apache/kafka/pull/15704

   This PR adds code to support Apache Kafka Docker Official Images.
   The files added include:
   1. .github/workflows/prepare_docker_official_image_source.yml This adds 
github actions workflow to create artifact containing the hardcoded static 
source files for docker official images.
   2. docker/prepare_docker_official_image_source.py This is the python script 
to create sub-directory containing the hardcoded static source files for docker 
official images (which is later pushed as an artifact).
   3. .github/workflows/docker_official_image_build_and_test.yml This adds 
github actions workflow to build and test docker official image.
   4. docker/docker_official_image_build_test.py This is the python script to 
build and test docker official image.
   5. docker/common.sh and docker/generate_kafka_pr_template.sh This are 
scripts to create the PR template to be raised in the docker official images 
repo. 
   6. docker/docker_official_images/.gitignore This is the new directory to 
keep Docker Official Image assets.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16373: Adding Apache Kafka Docker Official Images Directory [kafka]

2024-04-12 Thread via GitHub


KrishVora01 closed pull request #15703: KAFKA-16373: Adding Apache Kafka Docker 
Official Images Directory 
URL: https://github.com/apache/kafka/pull/15703


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org