[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy

2023-10-26 Thread Jiao Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiao Zhang updated KAFKA-15701:
---
Description: 
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed only when partition number is within limit. For user 
B, we allow topic creation without check. From the standing of kafka service 
provider, user A is imaged as random user of kafka service and user B is imaged 
as internal user for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

I think it's natural to also support this kind of usage in upstream. So I 
raised this Jira for asking community's ideas about this. 

  was:
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed only when partition number is within limit. For user 
B, we allow topic creation without check. From the standing of kafka service 
provider, user A is imaged as random user of kafka service and user B is imaged 
as internal user for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 


> Allow use of user policy in CreateTopicPolicy 
> --
>
> Key: KAFKA-15701
> URL: https://issues.apache.org/jira/browse/KAFKA-15701
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiao Zhang
>Priority: Minor
>
> One use case of CreateTopicPolicy we have experienced is allow/reject topic 
> creation by checking the user.
> Especially for the secured cluster usage, we add acls to specific users for 
> allowing topic creation. At the same time, we have the needs to design 
> customized create topic policy for different users. For example, for user A, 
> topic creation is allowed only when partition number is within limit. For 
> user B, we allow topic creation without check. From the standing of kafka 
> service provider, user A is imaged as random user of kafka service and user B 
> is imaged as internal user for cluster management.
> For this need, we patched our local fork of kafka by passing user principle 
> in KafkaApis.
> One place need to revise is here 
> [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]
> I think it's natural to also support this kind of usage in upstream. So I 
> raised this Jira for asking community's ideas about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy

2023-10-26 Thread Jiao Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiao Zhang updated KAFKA-15701:
---
Description: 
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed only when partition number is within limit. For user 
B, we allow topic creation without check. From the standing of kafka service 
provider, user A is imaged as random user of kafka service and user B is imaged 
as internal user for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 

  was:
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed only when partition number is within limit. For user 
B, we allow topic creation without check. As the kafka service provider, user A 
is imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 


> Allow use of user policy in CreateTopicPolicy 
> --
>
> Key: KAFKA-15701
> URL: https://issues.apache.org/jira/browse/KAFKA-15701
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiao Zhang
>Priority: Minor
>
> One use case of CreateTopicPolicy we have experienced is allow/reject topic 
> creation by checking the user.
> Especially for the secured cluster usage, we add acls to specific users for 
> allowing topic creation. At the same time, we have the needs to design 
> customized create topic policy for different users. For example, for user A, 
> topic creation is allowed only when partition number is within limit. For 
> user B, we allow topic creation without check. From the standing of kafka 
> service provider, user A is imaged as random user of kafka service and user B 
> is imaged as internal user for cluster management.
> For this need, we patched our local fork of kafka by passing user principle 
> in KafkaApis.
> One place need to revise is here 
> [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]
> As thinking it's natural to support this kind of usage even in upstream, I 
> raised this Jira for asking community's ideas about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy

2023-10-26 Thread Jiao Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiao Zhang updated KAFKA-15701:
---
Description: 
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed when partition number is within limit. For user B, we 
allow topic creation without check. As the kafka service provider, user A is 
imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 

  was:
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user .

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed when partition number is within limit. For user B, we 
allow topic creation without check. As the kafka service provider, user A is 
imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 


> Allow use of user policy in CreateTopicPolicy 
> --
>
> Key: KAFKA-15701
> URL: https://issues.apache.org/jira/browse/KAFKA-15701
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiao Zhang
>Priority: Minor
>
> One use case of CreateTopicPolicy we have experienced is allow/reject topic 
> creation by checking the user.
> Especially for the secured cluster usage, we add acls to specific users for 
> allowing topic creation. At the same time, we have the needs to design 
> customized create topic policy for different users. For example, for user A, 
> topic creation is allowed when partition number is within limit. For user B, 
> we allow topic creation without check. As the kafka service provider, user A 
> is imaged as random user of kafka service and user B is imaged as internal 
> user for cluster management.
> For this need, we patched our local fork of kafka by passing user principle 
> in KafkaApis.
> One place need to revise is here 
> [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]
> As thinking it's natural to support this kind of usage even in upstream, I 
> raised this Jira for asking community's ideas about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy

2023-10-26 Thread Jiao Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiao Zhang updated KAFKA-15701:
---
Description: 
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed only when partition number is within limit. For user 
B, we allow topic creation without check. As the kafka service provider, user A 
is imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 

  was:
One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user.

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed when partition number is within limit. For user B, we 
allow topic creation without check. As the kafka service provider, user A is 
imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 


> Allow use of user policy in CreateTopicPolicy 
> --
>
> Key: KAFKA-15701
> URL: https://issues.apache.org/jira/browse/KAFKA-15701
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiao Zhang
>Priority: Minor
>
> One use case of CreateTopicPolicy we have experienced is allow/reject topic 
> creation by checking the user.
> Especially for the secured cluster usage, we add acls to specific users for 
> allowing topic creation. At the same time, we have the needs to design 
> customized create topic policy for different users. For example, for user A, 
> topic creation is allowed only when partition number is within limit. For 
> user B, we allow topic creation without check. As the kafka service provider, 
> user A is imaged as random user of kafka service and user B is imaged as 
> internal user for cluster management.
> For this need, we patched our local fork of kafka by passing user principle 
> in KafkaApis.
> One place need to revise is here 
> [https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]
> As thinking it's natural to support this kind of usage even in upstream, I 
> raised this Jira for asking community's ideas about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15701) Allow use of user policy in CreateTopicPolicy

2023-10-26 Thread Jiao Zhang (Jira)
Jiao Zhang created KAFKA-15701:
--

 Summary: Allow use of user policy in CreateTopicPolicy 
 Key: KAFKA-15701
 URL: https://issues.apache.org/jira/browse/KAFKA-15701
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiao Zhang


One use case of CreateTopicPolicy we have experienced is allow/reject topic 
creation by checking the user .

Especially for the secured cluster usage, we add acls to specific users for 
allowing topic creation. At the same time, we have the needs to design 
customized create topic policy for different users. For example, for user A, 
topic creation is allowed when partition number is within limit. For user B, we 
allow topic creation without check. As the kafka service provider, user A is 
imaged as random user of kafka service and user B is imaged as internal user 
for cluster management.

For this need, we patched our local fork of kafka by passing user principle in 
KafkaApis.

One place need to revise is here 
[https://github.com/apache/kafka/blob/834f72b03de40fb47caaad1397ed061de57c2509/core/src/main/scala/kafka/server/KafkaApis.scala#L1980]

As thinking it's natural to support this kind of usage even in upstream, I 
raised this Jira for asking community's ideas about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1374065471


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return

Review Comment:
   For my own education: Why do we need a `ConcurrentMap`?



##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   Not sure why we need to wrap `instantAndValue` in an `AtomicReference`. Can 
you elaborate?



##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import 

[jira] [Commented] (KAFKA-15695) Local log start offset is not updated on the follower after rebuilding remote log auxiliary state

2023-10-26 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780166#comment-17780166
 ] 

Kamal Chandraprakash commented on KAFKA-15695:
--

[~nikramakrishnan]
I remember fixing this issue in 
[#14328|https://github.com/apache/kafka/pull/14328] and covered it via 
integration test. Can you come up with a test to reproduce this issue? Thanks!

> Local log start offset is not updated on the follower after rebuilding remote 
> log auxiliary state
> -
>
> Key: KAFKA-15695
> URL: https://issues.apache.org/jira/browse/KAFKA-15695
> Project: Kafka
>  Issue Type: Bug
>  Components: replication, Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Nikhil Ramakrishnan
>Assignee: Nikhil Ramakrishnan
>Priority: Major
>  Labels: KIP-405, tiered-storage
> Fix For: 3.7.0
>
>
> In 3.6, the local log start offset is not updated when reconstructing the 
> auxiliary state of the remote log on a follower.
> The impact of this bug is significant because, if this follower becomes the 
> leader before the local log start offset has had a change to be updated, 
> reads from any offset between [wrong log start offset; actual log start 
> offset] will be routed on the local storage, which does not contain the 
> corresponding data. Consumer reads will in this case never be satisfied.
>  
> Reproduction case:
>  # Create a cluster with 2 brokers, broker 0 and broker 1.
>  # Create a topic topicA with RF=2, 1 partition (topicA-0) and 2 batches per 
> segment, with broker 0 as the leader.
>  # Stop broker 1, and produce 3 records to topicA, such that segment 1 with 
> the first two records are copied to remote and deleted from local storage.
>  # Start broker 1, let it catch up with broker 0.
>  # Stop broker 0 such that broker 1 is elected as the leader, and try to 
> consume from the beginning of topicA-0.
> This consumer read will not be satisfied because the local log start offset 
> is not updated on broker 1 when it builds the auxiliary state of the remote 
> log segments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782275615

   Hi @lucasbru - Thanks for taking the time to review my PR.  I addressed all 
but 2 comments:
   1. Mockito: Could you be more specific on how you expect to mock the 
response object?
   2. Error handling: Essentially all errors in `continueHandlePartitionErrors` 
can only happen in the response. I understand there are some redundancy there 
and can be a bit confusing.  But the response and both throw a hard failures 
(response = null and throwable = non-null) or a server side error (response = 
non-null).  That is why it was kept separated.  If you find it unclear - how do 
I make it more readable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1374057359


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets
  * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
  * {@link OffsetCommitRequestState} and enqueue it to send later.
  */
-public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+public OffsetCommitRequestState addOffsetCommitRequest(final 
Map offsets) {

Review Comment:
   Got it, I think the main reason was to test retryBackoff.  I made some 
changes according to your suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1374013380


##
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java:
##
@@ -34,6 +34,9 @@
 public abstract class WrappedStateStore implements 
StateStore, CachedStateStore {
 
 public static boolean isTimestamped(final StateStore stateStore) {
+if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {

Review Comment:
   got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1374019186


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();
+String rawName = metricName.name() == null ? "" : 
metricName.name();
+
+return new MetricKey(fullMetricName(domain, group, rawName),
+Collections.unmodifiableMap(cleanTags(metricName.tags(;
+}
+
+@Override
+public MetricKey derivedMetricKey(MetricKey key, String 
derivedComponent) {
+Objects.requireNonNull(derivedComponent, "derived component 
cannot be null");
+return new MetricKey(key.getName() + NAME_JOINER + 
derivedComponent, key.tags());
+}
+};
+}
+
+/**
+ * Creates a metric name given the domain, group, and name. The new String 
follows the following
+ * conventions and rules:
+ *
+ * 
+ *   domain is expected to be a host-name like value, e.g. {@code 
org.apache.kafka}
+ *   group is cleaned of redundant words: "-metrics"
+ *   the group and metric name is dot separated
+ *   The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}
+ * 
+ */
+private static String fullMetricName(String domain, String group, String 
name) {
+return domain
++ NAME_JOINER
++ cleanGroup(group)
++ NAME_JOINER
++ cleanMetric(name);
+}
+
+/**
+ * This method maps a raw name to follow conventions and cleans up the 
result to be more legible:
+ * 
+ *   converts names to lower hyphen case conventions
+ *   strips redundant parts of the metric name, such as -metrics
+ *   normalizes artifacts of hyphen case to dot separated conversion
+ * 
+ */
+private static String cleanGroup(String group) {
+group = clean(group, NAME_JOINER);
+return GROUP_PATTERN.matcher(group).replaceAll("");
+}
+
+private static String cleanMetric(String metric) {
+return clean(metric, NAME_JOINER);
+}
+
+/**
+ * Converts a tag name to match the telemetry naming conventions by 
converting snake_case.
+ * 
+ * Kafka metrics have tags name in lower case separated by hyphens. Eg: 
total-errors
+ *
+ * @param raw the input map
+ * @return the new map with keys replaced by snake_case representations.
+ */
+private static Map cleanTags(Map raw) {
+return raw.entrySet()
+.stream()
+.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), 
Entry::getValue));
+}
+
+private static String clean(String raw, String joiner) {
+Objects.requireNonNull(raw, "metric 

[jira] [Resolved] (KAFKA-15390) FetchResponse.preferredReplica may contains fenced replica in KRaft mode

2023-10-26 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming resolved KAFKA-15390.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> FetchResponse.preferredReplica may contains fenced replica in KRaft mode
> 
>
> Key: KAFKA-15390
> URL: https://issues.apache.org/jira/browse/KAFKA-15390
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Assignee: Deng Ziming
>Priority: Major
> Fix For: 3.6.0
>
>
> `KRaftMetadataCache.getPartitionReplicaEndpoints` will return a fenced broker 
> id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1374040965


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse(
 return buildOffsetFetchClientResponse(request, topicPartitionData, 
error);
 }
 
+private ClientResponse buildOffsetCommitClientResponse(final 
OffsetCommitResponse commitResponse,
+   final Errors error) 
{
+OffsetCommitResponseData data = new OffsetCommitResponseData();
+OffsetCommitResponse response = new OffsetCommitResponse(data);
+short apiVersion = 1;
+return new ClientResponse(
+new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1),
+null,
+"-1",
+time.milliseconds(),
+time.milliseconds(),
+false,
+null,
+null,
+commitResponse
+);
+}
+
+public ClientResponse mockOffsetCommitResponse(String topic, int 
partition, short apiKeyVersion, Errors error) {

Review Comment:
    Could you be specific: Do you mean by doing something like this?
   
   ```
   response = mock(ClientResponse.class);
   when(response.receivedTimeMs()).thenReturn(...);
   when(response.data()).thenReturn(responseData);
   return response;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse(
 return buildOffsetFetchClientResponse(request, topicPartitionData, 
error);
 }
 
+private ClientResponse buildOffsetCommitClientResponse(final 
OffsetCommitResponse commitResponse,
+   final Errors error) 
{
+OffsetCommitResponseData data = new OffsetCommitResponseData();
+OffsetCommitResponse response = new OffsetCommitResponse(data);
+short apiVersion = 1;
+return new ClientResponse(
+new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1),
+null,
+"-1",
+time.milliseconds(),
+time.milliseconds(),
+false,
+null,
+null,
+commitResponse
+);
+}
+
+public ClientResponse mockOffsetCommitResponse(String topic, int 
partition, short apiKeyVersion, Errors error) {

Review Comment:
    Could you be specific: Do you mean by doing something like this?
   
   ```
   response = mock(ClientResponse.class);
   when(response.receivedTimeMs()).thenReturn(...);
   when(response.data()).thenReturn(responseData);
   return response;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1374037210


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Value object that contains the name and tags for a Metric.
+ */
+public class MetricKey implements MetricKeyable {
+
+private final String name;
+private final Map tags;
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the telemetry metric name of 
the metric (the final name
+ * under which this metric is emitted).
+ */
+public MetricKey(String name) {
+this(name, null);
+}
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ * @param tags mapping of tag keys to values.
+ */
+public MetricKey(String name, Map tags) {
+this.name = Objects.requireNonNull(name);
+this.tags = tags != null ? Collections.unmodifiableMap(tags) : 
Collections.emptyMap();
+}
+
+public MetricKey(MetricName metricName) {
+this(metricName.name(), metricName.tags());
+}
+
+@Override
+public MetricKey key() {
+return this;
+}
+
+public String getName() {

Review Comment:
   I was told the same - but it doesn't seem to be enforced everywhere.   Call 
for Apache Kafka style guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1782236753

   Hi @kirktrue Thanks for taking time reviewing my code. I made changes 
according to your suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374036176


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +
+"Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor 
to use. It cannot be used in " +
+"conjunction with group.local.assignor. The group 
coordinator will choose the assignor if no " +

Review Comment:
   Good call. Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035895


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   108 char width!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035641


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   You are right! I kept it that way because I was trying to avoid editing a 
few hundred lines of code!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15445: [WIP] Add JVM Docker image [kafka]

2023-10-26 Thread via GitHub


sanjay-awatramani commented on code in PR #14552:
URL: https://github.com/apache/kafka/pull/14552#discussion_r1374022606


##
docker/docker_release.py:
##
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python script to build and push docker image
+Usage: docker_release.py
+
+Interactive utility to push the docker image to dockerhub
+"""
+
+import subprocess
+from distutils.dir_util import copy_tree
+from datetime import date
+import shutil
+
+def push_jvm(image, kafka_url):
+copy_tree("resources", "jvm/resources")
+subprocess.run(["docker", "buildx", "build", "-f", "jvm/Dockerfile", 
"--build-arg", f"kafka_url={kafka_url}", "--build-arg", 
f"build_date={date.today()}",
+"--push",
+"--platform", "linux/amd64,linux/arm64",
+"--tag", image, "jvm"])
+shutil.rmtree("jvm/resources")
+
+def login():
+status = subprocess.run(["docker", "login"])
+if status.returncode != 0:
+print("Docker login failed, aborting the docker release")
+raise PermissionError
+
+def create_builder():
+subprocess.run(["docker", "buildx", "create", "--name", "kafka-builder", 
"--use"])
+
+def remove_builder():
+subprocess.run(["docker", "buildx", "rm", "kafka-builder"])
+
+if __name__ == "__main__":

Review Comment:
   This is an interactive script. How would we support automation of release 
process via GH actions or Jenkins?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-26 Thread via GitHub


ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1374014105


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -462,22 +462,32 @@ public Optional lastEntry(long 
producerId) {
 }
 
 /**
- * Take a snapshot at the current end offset if one does not already exist.
+ * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
  */
 public void takeSnapshot() throws IOException {
+takeSnapshot(true);
+}
+
+/**
+ * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+ */
+public Optional takeSnapshot(boolean sync) throws IOException {

Review Comment:
   Thanks, that's a good point. I overlooked snapshot files would be cleaned up 
upon receiving `OffsetMovedToRemoteStorage`.
   
   In this case, if async flush is performed against non-existent file, it 
would throw IOException so we should catch it and ignore if it's 
NoSuchFileException.
   (Since file creation is still done in original thread so shouldn't conflict 
with truncateFullyAndReloadSnapshots. Only fsync is moved to async thread)
   
   I'll fix that



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -462,22 +462,32 @@ public Optional lastEntry(long 
producerId) {
 }
 
 /**
- * Take a snapshot at the current end offset if one does not already exist.
+ * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
  */
 public void takeSnapshot() throws IOException {
+takeSnapshot(true);
+}
+
+/**
+ * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+ */
+public Optional takeSnapshot(boolean sync) throws IOException {

Review Comment:
   Thanks, that's a good point. I overlooked snapshot files would be cleaned up 
upon receiving `OffsetMovedToRemoteStorage`.
   
   In this case, if async flush is performed against non-existent file, it 
would throw IOException so we should catch it and ignore if it's 
NoSuchFileException.
   (Since file creation is still done in original thread so shouldn't conflict 
with `truncateFullyAndReloadSnapshots`. Only fsync is moved to async thread)
   
   I'll fix that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1374016565


##
build.gradle:
##
@@ -295,7 +296,12 @@ subprojects {
   }
   publications {
 mavenJava(MavenPublication) {
-  from components.java
+  if (!shouldPublishWithShadow) {
+from components.java
+  } else {
+apply plugin: 'com.github.johnrengelman.shadow'

Review Comment:
   What is this plugin? Does not look like something official? Not sure if we 
should use something that is no backed by some official OSS project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1374007941


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Value object that contains the name and tags for a Metric.
+ */
+public class MetricKey implements MetricKeyable {
+
+private final String name;
+private final Map tags;
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the telemetry metric name of 
the metric (the final name
+ * under which this metric is emitted).
+ */
+public MetricKey(String name) {
+this(name, null);
+}
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ * @param tags mapping of tag keys to values.
+ */
+public MetricKey(String name, Map tags) {
+this.name = Objects.requireNonNull(name);
+this.tags = tags != null ? Collections.unmodifiableMap(tags) : 
Collections.emptyMap();
+}
+
+public MetricKey(MetricName metricName) {
+this(metricName.name(), metricName.tags());
+}
+
+@Override
+public MetricKey key() {
+return this;
+}
+
+public String getName() {

Review Comment:
   nit: we don't use `get` prefix as a convention; should be rename to `name()`



##
clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.telemetry;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.nio.ByteBuffer;
+
+@InterfaceStability.Evolving
+public interface ClientTelemetryPayload {

Review Comment:
   Should we add a class level JavaDoc describing it? It's a public interface, 
so seems important?



##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import 

Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-26 Thread via GitHub


ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1374014105


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -462,22 +462,32 @@ public Optional lastEntry(long 
producerId) {
 }
 
 /**
- * Take a snapshot at the current end offset if one does not already exist.
+ * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
  */
 public void takeSnapshot() throws IOException {
+takeSnapshot(true);
+}
+
+/**
+ * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+ */
+public Optional takeSnapshot(boolean sync) throws IOException {

Review Comment:
   Thanks, that's a good point. I overlooked snapshot files would be cleaned up 
upon receiving `OffsetMovedToRemoteStorage`.
   
   In this case, if async flush is performed against non-existent file, it 
would throw IOException so we should catch it and ignore if it's 
NoSuchFileException.
   (Since file creation is still done in original thread. Only fsync is moved 
to async thread)
   
   I'll fix that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1374013380


##
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java:
##
@@ -34,6 +34,9 @@
 public abstract class WrappedStateStore implements 
StateStore, CachedStateStore {
 
 public static boolean isTimestamped(final StateStore stateStore) {
+if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {

Review Comment:
   but now` TimestampedBytesStore` like this:
   
   ```
   public interface TimestampedBytesStore {
   static byte[] convertToTimestampedFormat(final byte[] plainValue) {
   if (plainValue == null) {
   return null;
   }
   return ByteBuffer
   .allocate(8 + plainValue.length)
   .putLong(NO_TIMESTAMP)
   .put(plainValue)
   .array();
   }
   }
   ```
   
   you mean we implement a method return true in this interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15201: Allow git push to fail gracefully [kafka]

2023-10-26 Thread via GitHub


Owen-CH-Leung commented on code in PR #14645:
URL: https://github.com/apache/kafka/pull/14645#discussion_r1374005528


##
release.py:
##
@@ -730,7 +730,7 @@ def select_gpg_key():
 fail("Ok, giving up")
 if not user_ok("Ok to push RC tag %s (y/n)?: " % rc_tag):
 fail("Ok, giving up")
-cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag))
+cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag), 
num_retries=0)

Review Comment:
   if we put `allow_failure=True`, when `git push` failed, the program will 
continue to run the remaining lines after `git push` instead of failing 
gracefully. So I think we should just keep `num_retries=0` so that it triggered 
the `fail` function and proceed to do the cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-26 Thread via GitHub


yyu1993 commented on code in PR #14545:
URL: https://github.com/apache/kafka/pull/14545#discussion_r1373996821


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File],
 cleaner.updateCheckpoints(removedLog.parentDirFile, 
partitionToRemove = Option(topicPartition))
   }
 }
-removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
false)
+if (isStray) {

Review Comment:
   Got it. Makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15689: Logging skipped event when expected migration state is wrong [kafka]

2023-10-26 Thread via GitHub


showuon commented on PR #14646:
URL: https://github.com/apache/kafka/pull/14646#issuecomment-1782174276

   @ppatierno , please resolve the conflicts. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Fix Version/s: 3.5.2

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0, 3.5.2
>
> Attachments: compat_report.html.zip
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]

2023-10-26 Thread via GitHub


mjsax commented on PR #14216:
URL: https://github.com/apache/kafka/pull/14216#issuecomment-1782162923

   Cherry-picked this to `3.5` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1373965049


##
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record from a versioned state 
store based on its key and timestamp.

Review Comment:
   We should add `@param ` and `@parm ` to describe the generic types



##
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record from a versioned state 
store based on its key and timestamp.
+ */
+@Evolving
+public final class VersionedKeyQuery implements 
Query> {
+
+private final K key;
+private final Optional asOfTimestamp;
+
+private VersionedKeyQuery(final K key, final Optional 
asOfTimestamp) {
+this.key = Objects.requireNonNull(key);
+this.asOfTimestamp = asOfTimestamp;
+}
+
+/**
+ * Creates a query that will retrieve the record from a versioned state 
store identified by {@code key} if it exists
+ * (or {@code null} otherwise).
+ * @param key The key to retrieve
+ * @param  The type of the key
+ * @param  The type of the value that will be retrieved
+ * @throws NullPointerException if @param key is null
+ */
+public static  VersionedKeyQuery withKey(final K key) {
+return new VersionedKeyQuery<>(key, Optional.empty());
+}
+
+/**
+ * Specifies the timestamp for the key query. The key query returns the 
record version for the specified timestamp.

Review Comment:
   `record's` ? (not sure)



##
streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import 

Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373981478


##
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java:
##
@@ -34,6 +34,9 @@
 public abstract class WrappedStateStore implements 
StateStore, CachedStateStore {
 
 public static boolean isTimestamped(final StateStore stateStore) {
+if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {

Review Comment:
   We should not add this case, but rather let 
`KeyValueToTimestampedKeyValueByteStoreAdapter` implement the marker interface 
`TimestampedBytesStore` such that the existing condition returns `true` for 
this store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373977415


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -346,6 +346,23 @@ public static  Function 
getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), 
byteArray);
 }
 
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static  Function getDeserializeValue2(final 
StateSerdes serdes,
+   final 
StateStore wrapped,
+   final boolean 
isDSLStore ) {
+final Serde valueSerde = serdes.valueSerde();
+final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;
+final Deserializer deserializer;
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review Comment:
   This just solve the KeyQuery temporary, now I am debugging at RangeQuery, we 
should transfer the `RocksDBRangeIterator `to 
`KeyValueToTimestampedKeyValueIteratorAdapter` in 
`KeyValueToTimestampedKeyValueByteStoreAdapter`. Because we want to transfer kv 
iterator to ts kv iterator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373970782


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -346,6 +346,23 @@ public static  Function 
getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), 
byteArray);
 }
 
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static  Function getDeserializeValue2(final 
StateSerdes serdes,
+   final 
StateStore wrapped,
+   final boolean 
isDSLStore ) {
+final Serde valueSerde = serdes.valueSerde();
+final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;
+final Deserializer deserializer;
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review Comment:
   Yes, I find that bug, that bug in `WrappedStateStore`,  we should add a 
statement like this
   ```
   public static boolean isTimestamped(final StateStore stateStore) {
   if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {
   return true;
   }
   
   ```
   
   But this just solve the ROCK_KV DSL store with cache = true.
   Because we the cache = true, we get the data from `CachingKeyValueStore ` 
but not `KeyValueToTimestampedKeyValueByteStoreAdapter`
   
   
   But we have to force` timestamped` to be true in the `getDeserializeValue`, 
because like `CachingKeyValueStore `  it's not ts-kv store, but ROCK_KV DSL  
get data from this store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373970782


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -346,6 +346,23 @@ public static  Function 
getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), 
byteArray);
 }
 
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static  Function getDeserializeValue2(final 
StateSerdes serdes,
+   final 
StateStore wrapped,
+   final boolean 
isDSLStore ) {
+final Serde valueSerde = serdes.valueSerde();
+final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;
+final Deserializer deserializer;
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review Comment:
   Yes, I find that bug, that bug in `WrappedStateStore`,  we should add a 
statement like this
   ```
   public static boolean isTimestamped(final StateStore stateStore) {
   if (stateStore instanceof 
KeyValueToTimestampedKeyValueByteStoreAdapter) {
   return true;
   }
   if (stateStore instanceof TimestampedBytesStore) {
   return true;
   } else if (stateStore instanceof WrappedStateStore) {
   
   ```
   
   But this just solve the ROCK_KV DSL store with cache = true.
   
   We have to force` timestamped` to be true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15527: Update Javadoc for range and reverseRange in ReadOnlyKeyValueStore and update upgrade-guide.html for kafka streams webpage [kafka]

2023-10-26 Thread via GitHub


mjsax merged PR #14600:
URL: https://github.com/apache/kafka/pull/14600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-26 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373953314


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -126,15 +128,22 @@ public  QueryResult query(
 final PositionBound positionBound,
 final QueryConfig config) {
 
-
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+Position position = result.getPosition();
+if (result.isSuccess()) {
+byte[] res = (byte[]) result.getResult();
+byte[] res1 = convertToTimestampedFormat(res);
+result = (QueryResult) QueryResult.forResult(res1);
+}
+
 if (config.isCollectExecutionInfo()) {
 final long end = System.nanoTime();
 result.addExecutionInfo(
 "Handled in " + getClass() + " in " + (end - start) + "ns"
 );
 }
+result.setPosition(position);

Review Comment:
   Why this change?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -126,15 +128,22 @@ public  QueryResult query(
 final PositionBound positionBound,
 final QueryConfig config) {
 
-
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+Position position = result.getPosition();
+if (result.isSuccess()) {
+byte[] res = (byte[]) result.getResult();
+byte[] res1 = convertToTimestampedFormat(res);
+result = (QueryResult) QueryResult.forResult(res1);

Review Comment:
   We need to set the position on the newly created result. We should also copy 
the "execution information" (also wondering if we should add a new entry to 
it?) \cc @aliehsaeedii WDYT?
   
   Maybe we can actually re-use 
`InternalQueryResultUtil.copyAndSubstituteDeserializedResult` ?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##
@@ -126,15 +128,22 @@ public  QueryResult query(
 final PositionBound positionBound,
 final QueryConfig config) {
 
-
 final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-final QueryResult result = store.query(query, positionBound, 
config);
+QueryResult result = store.query(query, positionBound, config);
+Position position = result.getPosition();
+if (result.isSuccess()) {
+byte[] res = (byte[]) result.getResult();
+byte[] res1 = convertToTimestampedFormat(res);

Review Comment:
   Use better variable names. Eg `res` -> `plainValue` and `res2` -> 
`valueWithTimestamp`



##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue {
 this.value = value;
 }
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final QueryConfig config) {
+
+final long start = time.nanoseconds();
+final QueryResult result;
+
+final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, config);
+if (config.isCollectExecutionInfo()) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+config,
+this
+);
+if (config.isCollectExecutionInfo()) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+}
+}
+return result;
+}
+
+
+
+@SuppressWarnings("unchecked")
+protected  QueryResult runTimestampKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final QueryConfig 
config) {
+final QueryResult result;
+final TimestampedKeyQuery typedKeyQuery = 

[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15690:

Component/s: streams
 unit tests

> EosIntegrationTest is flaky.
> 
>
> Key: KAFKA-15690
> URL: https://issues.apache.org/jira/browse/KAFKA-15690
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> EosIntegrationTest
> shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
>  processing threads = false]
> {code:java}
> org.junit.runners.model.TestTimedOutException: test timed out after 600 
> seconds   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:)
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821)
>   at 
> org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779)
>at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
>   shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
> threads = false] 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286)
>at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174)
> at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
> shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
> threads = false] 
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> StreamsTasks did not request commit. ==> expected:  but was: 
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
> java.lang.IllegalStateException: Replica 
> [Topic=__transaction_state,Partition=2,Replica=1] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in OnlineReplica state   
> at 
> 

Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


kirktrue commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1373924481


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +

Review Comment:
   I think we should use the lower-case versions of these strings:
   
   ```suggestion
   "support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be used.  " +
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +
+"Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor 
to use. It cannot be used in " +
+"conjunction with group.local.assignor. The group 
coordinator will choose the assignor if no " +

Review Comment:
   Two questions:
   
   1. Is `group.local.assignor` the same as `group.local.assignors`?
   2. If `group.local.assignors` isn't in this PR, should we just omit that 
sentence?



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   You need a wider monitor, @philipnee 
   
   ```suggestion
   
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   You need a wider monitor, @philipnee 
   
   ```suggestion
   
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String 

[PR] [KIP-954] support custom DSL store providers [kafka]

2023-10-26 Thread via GitHub


agavra opened a new pull request, #14648:
URL: https://github.com/apache/kafka/pull/14648

   ### Overview
   Implementation for 
[KIP-954](https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types)
   
   ### Testing Strategy
   * Updated the topology tests to ensure that the configuration is picked up 
in the topology builder
   * Manually built a Kafka Streams application using a customer 
`DslStoreSuppliers` class and verified that it was used
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15680 [kafka]

2023-10-26 Thread via GitHub


C0urante commented on PR #14630:
URL: https://github.com/apache/kafka/pull/14630#issuecomment-1782075383

   I can take a look next week. In the meantime, do you think you could add 
some unit tests for this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-26 Thread via GitHub


jolshan commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1373924152


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -360,7 +360,9 @@ public short partitionRecordVersion() {
 }
 
 public short fetchRequestVersion() {
-if (this.isAtLeast(IBP_3_5_IV1)) {
+if (this.isAtLeast(IBP_3_7_IV0)) {

Review Comment:
   I just realized that if we set the version as unstable, we may not be able 
to use it here. 臘‍♀️ Maybe we should remove the unstable version true if this 
causes issues in tests.
   
   Sorry for confusion.



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -360,7 +360,9 @@ public short partitionRecordVersion() {
 }
 
 public short fetchRequestVersion() {
-if (this.isAtLeast(IBP_3_5_IV1)) {
+if (this.isAtLeast(IBP_3_7_IV0)) {

Review Comment:
   I just realized that if we set the version as unstable, we may not be able 
to use it here. 臘‍♀️ Maybe we should remove the unstable version true if this 
causes issues in tests.
   
   Sorry for confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14419:
---
Summary: Failed SyncGroup leading to partitions lost due to processing 
during rebalances  (was: Same message consumed again by the same stream task 
after partition is lost and reassigned)

> Failed SyncGroup leading to partitions lost due to processing during 
> rebalances
> ---
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,923 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as 
> lost since 

[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780119#comment-17780119
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14419:


[~Carlstedt] first off, do you by any chance have especially long processing 
latencies? For example iterating over a large range of a state store, making a 
blocking remote call, etc

Since you're using the default max poll interval, I might recommend starting 
out by lowering the max.poll.records first, as you generally don't want to make 
the max.poll.interval too small – ideally that would be the last resort.

IIRC the default max.poll.records in Streams is set to 1,000 – so maybe try 
cutting that down to just 100? You also can (should) experiment a bit to find a 
good balance for your app in the steady state. Though I suppose if you're 
seeing this frequently enough/able to reproduce it reliably, you could set it 
to something extremely low as a test, like 10 let's say, just to see if that 
solves the issue.

I'll try to put together a PR for this sometime soon, maybe early next week, so 
you can also wait for that and trying running with a patched version of 
Streams. (Would a trunk/3.7 patched version be alright? I'm happy to create a 
branch with the fix ported to an earlier version of Kafka Streams if you'd 
prefer, just let me know which version you need)

Btw: I'm going to update the ticket title if you don't mind, so that it 
reflects the bug described in my last response. The current title is just 
describing the correct behavior of Kafka Streams working as intended

> Same message consumed again by the same stream task after partition is lost 
> and reassigned
> --
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  

[jira] [Comment Edited] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779731#comment-17779731
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-14419 at 10/26/23 10:49 PM:
---

Hey, sorry for the long delay – I'm still trying to catch up my memory of this 
ticket and the related one, but after looking at it again with fresh eyes I 
think I figured out what's going on here. If I'm reading this situation 
correctly, it does seem like there is some less-than-ideal behavior that we 
might be able to improve. 

Based on your recent logs, I think the root cause here is basically the same as 
what I fixed in 
[https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,],
 just to a lesser degree. The issue in that patch was that Streams would 
sometimes trigger a followup rebalance even while the current rebalance was 
still going on, which lead some members to drop out of the group upon hitting a 
REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just 
made the StreamThread wait until the rebalance was over before triggering a 
followup.

This should have been sufficient, but I suppose it is still theoretically 
possible to run into the same issue. Taking a deeper look at the original 
issue, it would only arise because of how Streams uses a non-blocking poll 
which allows it to return to its main loop and continue processing in the 
background during a rebalance. A lot of things happen throughout the loop, but 
the relevant operations here are as such:
 # Check the rebalance "schedule" and trigger one if:
 ## it has been requested for a time equal to or less than the current time
 ## the consumer is not actively participating in a rebalance (ie sometime 
after a SyncGroup response is received but before sending a new JoinGroup 
request)
 # Poll for more records, during which time either or both of the following may 
occur:
 ## consumer enters a new rebalance by sending a JoinGroup request
 ## consumer participates in a rebalance by receiving the JoinGroup response 
and sending a SyncGroup request
 ## consumer completes an ongoing rebalance by receiving a SyncGroup response, 
after which it can commit offsets for revoked tasks and initialize new ones
 # Process more records, which might have been either:
 ## Newly-consumed during the last poll call, or
 ## Left over from a previous batch that could not be fully processed before 
needing to return to poll due to running out of time in the max.poll.interval

So here's what I'm imagining: let's say we have two consumer, A and B, with A 
being the group leader/assignor.
 # A new rebalance begins, and both threads send their JoinGroup requests 
before returning to process some records
 # A doesn't have many records left to process, so it quickly returns to the 
poll call in step 2 of the loop. However B is still processing a large backlog
 # A performs the assignment and determines that a followup rebalance is 
needed, so it sets the rebalance schedule to 
 # After the assignment, A sends it out in the SyncGroup request and exits the 
poll call
 # A does some processing (or not) before returning to the poll and receiving 
the SyncGroup response
 # A exits the poll again, and this time when it reaches step 1 of the loop, it 
is now able to trigger the new rebalance
 # After A has requested a new rebalance, it finally returns to the poll call 
one more time, and rejoins the group/sends a JoinGroup request to kick it off
 # This whole time, B has had a large backlog of records, or a very high 
max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3
 # B finally finishes processing and leaves step 3, returning to the poll call 
during which it sends a very late SyncGroup request.
 # When the SyncGroup response is eventually received, B gets the 
REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is 
stale

The fundamental issue here is that B is theoretically able to spend up to the 
max.poll.interval between sending its SyncGroup request and returning to poll 
to process the SyncGroup response, but A might be able to process its SyncGroup 
response, process its records, and then trigger a new rebalance all in that 
timeframe. This could happen when the task assignment is heavily imbalanced, 
for example. 

I can see a few potential paths forward here, and a fourth option that is more 
of a temporary workaround for [~Carlstedt] if you're still encountering this. 
None of them are really a guarantee, but they would help. For the most 
comprehensive fix we might want to consider doing two or even all three of 
these:

Option 1: add a small delay to the Streams followup rebalance trigger to help 
the entire group finish the SyncGroup phase before beginning the next rebalance.

Option 2: set a shorter upper bound on the maximum time a StreamThread 

Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-26 Thread via GitHub


ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1373898465


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) {
 if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
 List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-flush();
+// We intentionally don't force flushing change to the device 
here because:
+// - To avoid fsync latency
+//   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+//   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+// then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+flush(false);

Review Comment:
   > those old entries could still exist in the file
   
   Yeah, precisely, the content on the device (not file) could be still old. As 
long as we read the file in usual way (i.e. not through O_DIRECT), we can see 
the latest data.
   
   The staleness on the device arises only when the server experiences power 
failure before OS flushes the page cache.
   In this case, indeed the content could be rolled back to old state.
   
   But it won't be a problem because leader-epoch file will be truncated again 
to match to the log file upon loading procedure anyways (this is the case 
mentioned in (3) in PR description)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.

2023-10-26 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-15690:
---
Description: 
EosIntegrationTest

shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
 processing threads = false]
{code:java}
org.junit.runners.model.TestTimedOutException: test timed out after 600 seconds 
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:)
at 
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821)
  at 
org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779)
   at 
org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837)

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_1, processor=KSTREAM-SOURCE-00, 
topic=multiPartitionInputTopic, partition=1, offset=15, 
stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at 
org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
   at 
org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
   at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
at 
org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
 {code}
  shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
threads = false] 
{code:java}
java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request. at 
org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204)
 at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286)
   at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274)
   at 
org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174)
at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_1, processor=KSTREAM-SOURCE-00, 
topic=multiPartitionInputTopic, partition=1, offset=15, 
stacktrace=java.lang.RuntimeException: Detected we've been interrupted. at 
org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
   at 
org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
   at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
at 
org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
 {code}
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing threads 
= false] 
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
StreamsTasks did not request commit. ==> expected:  but was:  at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) 
   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)

java.lang.IllegalStateException: Replica 
[Topic=__transaction_state,Partition=2,Replica=1] should be in the 
OfflineReplica,ReplicaDeletionStarted states before moving to 
ReplicaDeletionIneligible state. Instead it is in OnlineReplica state at 
kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)
   at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)
at 
kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)
at scala.collection.immutable.List.foreach(List.scala:333) {code}
They are running long and may relate to timeout.

  was:
Finding the following integration tests flaky.

EosIntegrationTest {
 * 
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
 processing 

Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-26 Thread via GitHub


CalvinConfluent commented on PR #14603:
URL: https://github.com/apache/kafka/pull/14603#issuecomment-1781972169

   https://issues.apache.org/jira/browse/KAFKA-15699
   https://issues.apache.org/jira/browse/KAFKA-15700


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15700) FetchFromFollowerIntegrationTest is flaky

2023-10-26 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15700:
--

 Summary: FetchFromFollowerIntegrationTest is flaky
 Key: KAFKA-15700
 URL: https://issues.apache.org/jira/browse/KAFKA-15700
 Project: Kafka
  Issue Type: Bug
Reporter: Calvin Liu


It may relate to inappropriate timeout.

testRackAwareRangeAssignor(String).quorum=zk
{code:java}
java.util.concurrent.TimeoutException   at 
java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)   at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13(FetchFromFollowerIntegrationTest.scala:229)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13$adapted(FetchFromFollowerIntegrationTest.scala:228)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at 
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky

2023-10-26 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-15699:
---
Description: 
It may relate to inappropriate test timeout

testReplicateSourceDefault()
{code:java}
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be different, 
because it's in exclude filter! ==> expected: not equal but was: <8640>
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code}
testOffsetSyncsTopicsOnTarget() 
{code:java}
java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out at 
1698275715878 after 1 attempt(s)  at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
   at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276)
  at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235)
  at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149)
  at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code}

  was:
May relate to inaproparate 

testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget()  }


> MirrorConnectorsIntegrationBaseTest is flaky
> 
>
> Key: KAFKA-15699
> URL: https://issues.apache.org/jira/browse/KAFKA-15699
> Project: Kafka
>  Issue Type: Bug
>Reporter: Calvin Liu
>Priority: Major
>
> It may relate to inappropriate test timeout
> testReplicateSourceDefault()
> {code:java}
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 
> different, because it's in exclude filter! ==> expected: not equal but was: 
> <8640>
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code}
> testOffsetSyncsTopicsOnTarget() 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
> deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out 
> at 1698275715878 after 1 attempt(s)at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149)
>   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky

2023-10-26 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-15699:
---
Description: 
May relate to inaproparate 

testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget()  }

> MirrorConnectorsIntegrationBaseTest is flaky
> 
>
> Key: KAFKA-15699
> URL: https://issues.apache.org/jira/browse/KAFKA-15699
> Project: Kafka
>  Issue Type: Bug
>Reporter: Calvin Liu
>Priority: Major
>
> May relate to inaproparate 
> testReplicateSourceDefault() * testOffsetSyncsTopicsOnTarget()  }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-26 Thread via GitHub


mumrah merged PR #14545:
URL: https://github.com/apache/kafka/pull/14545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.

2023-10-26 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-15690:
---
Summary: EosIntegrationTest is flaky.  (was: Flaky integration tests)

> EosIntegrationTest is flaky.
> 
>
> Key: KAFKA-15690
> URL: https://issues.apache.org/jira/browse/KAFKA-15690
> Project: Kafka
>  Issue Type: Bug
>Reporter: Calvin Liu
>Priority: Major
>
> Finding the following integration tests flaky.
> EosIntegrationTest {
>  * 
> shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
>  processing threads = false] 
>  * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
> threads = false] 
>  * shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
> threads = false] 
> }
> MirrorConnectorsIntegrationBaseTest {
>  * testReplicateSourceDefault()
>  * testOffsetSyncsTopicsOnTarget() 
> }
> FetchFromFollowerIntegrationTest {
>  * testRackAwareRangeAssignor(String).quorum=zk
> }
> They are running long and may relate to timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky

2023-10-26 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15699:
--

 Summary: MirrorConnectorsIntegrationBaseTest is flaky
 Key: KAFKA-15699
 URL: https://issues.apache.org/jira/browse/KAFKA-15699
 Project: Kafka
  Issue Type: Bug
Reporter: Calvin Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-26 Thread via GitHub


mumrah commented on PR #14545:
URL: https://github.com/apache/kafka/pull/14545#issuecomment-1781961533

   Test failures look unrelated. 
   
   https://github.com/apache/kafka/assets/55116/b0eb9cec-76df-4d5e-ab73-dfd2ca4d5978;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-26 Thread via GitHub


mumrah commented on code in PR #14545:
URL: https://github.com/apache/kafka/pull/14545#discussion_r1373863647


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File],
 cleaner.updateCheckpoints(removedLog.parentDirFile, 
partitionToRemove = Option(topicPartition))
   }
 }
-removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
false)
+if (isStray) {

Review Comment:
   For now, the desired behavior is to stop tracking the partition but not 
delete the files. Since migrations are are one-off and inherently risky, I 
didn't want to take any destructive actions like deleting the logs (immediately 
or delayed). The stray'd partitions are logged at the INFO level when they are 
detected, and at WARN on subsequent startups. 
   
   This gives give operators the information needed to clean up stray 
partitions if desired. 
   
   I filed https://issues.apache.org/jira/browse/KAFKA-15698 to track automatic 
clean up of the stray partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15698) KRaft mode brokers should clean up stray partitions from migration

2023-10-26 Thread David Arthur (Jira)
David Arthur created KAFKA-15698:


 Summary: KRaft mode brokers should clean up stray partitions from 
migration
 Key: KAFKA-15698
 URL: https://issues.apache.org/jira/browse/KAFKA-15698
 Project: Kafka
  Issue Type: Improvement
Reporter: David Arthur


Follow up to KAFKA-15605. After the brokers are migrated to KRaft and the 
migration is completed, we should let the brokers clean up any partitions that 
we marked as "stray" during the migration. This would be any partition that was 
being deleted when the migration began, or any partition that was deleted, but 
not seen as deleted by StopReplica (e.g., broker down).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]

2023-10-26 Thread via GitHub


ableegoldman merged PR #14608:
URL: https://github.com/apache/kafka/pull/14608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]

2023-10-26 Thread via GitHub


ableegoldman commented on code in PR #14608:
URL: https://github.com/apache/kafka/pull/14608#discussion_r1373854238


##
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java:
##
@@ -523,13 +527,13 @@ public void 
shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception
 
 final Map kafkaStreams1Configuration = mkMap(
 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath() + "-ks1"),
-
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), 
appId + "-ks1"),

Review Comment:
   ah, yeah, good catch -- we should definitely not be using the 
`group.instance.id` config in tests, turning on static membership  will delay 
the test at best (and mess with it at worst)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]

2023-10-26 Thread via GitHub


ableegoldman commented on PR #14608:
URL: https://github.com/apache/kafka/pull/14608#issuecomment-1781945353

   Test failures are unrelated, and the relevant test passed in all cases, so 
let's merge this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15659: Fix shouldInvokeUserDefinedGlobalStateRestoreListener flaky test [kafka]

2023-10-26 Thread via GitHub


ableegoldman commented on code in PR #14608:
URL: https://github.com/apache/kafka/pull/14608#discussion_r1373854238


##
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java:
##
@@ -523,13 +527,13 @@ public void 
shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception
 
 final Map kafkaStreams1Configuration = mkMap(
 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath() + "-ks1"),
-
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), 
appId + "-ks1"),

Review Comment:
   ah, yeah, good catch -- we should definitely not be using the 
`group.instance.id` config in tests, it turns on static membership which will 
delay the test at best (and mess with it at worst)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   group3Response and group4Response are the same. I wanted to make sure when
```
   group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
   group4Future.complete(group4Response)
   ```
   handleOffsetFetch gives the same response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
 if (directories.isEmpty) {
   throw new TerseFailure("No log directories found in the configuration.")
 }
-
-val unformattedDirectories = directories.filter(directory => {
-  if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-  true
-  } else if (!ignoreFormatted) {
-throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-  "Use --ignore-formatted to ignore this directory and format the 
others.")
-  } else {
-false
-  }
-})
-if (unformattedDirectories.isEmpty) {
+val loader = new MetaPropertiesEnsemble.Loader()
+directories.foreach(loader.addLogDir(_))
+val metaPropertiesEnsemble = loader.load()
+metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+  util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+  val firstLogDir = copier.logDirProps().keySet().iterator().next()
+  throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+"Use --ignore-formatted to ignore this directory and format the 
others.")
+}
+if (!copier.errorLogDirs().isEmpty) {
+  val firstLogDir = copier.errorLogDirs().iterator().next()
+  throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+}
+if (copier.emptyLogDirs().isEmpty) {
   stream.println("All of the log directories are already formatted.")
+} else {
+  copier.emptyLogDirs().forEach(logDir => {
+val newMetaProperties = new MetaProperties.Builder(metaProperties).
+  setDirectoryId(copier.generateValidDirectoryId()).
+  build()
+copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   Fair point. I added a setter function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373847099


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+.setGroupId("group-4")
+.setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+  val group5Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   Yeah, you're right. Let me write it to a separate method. We probably don't 
need this group 5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   group3Response and group4Response are the same. I wanted to when
```
   group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
   group4Future.complete(group4Response)
   ```
   handleOffsetFetch gives the same response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks

2023-10-26 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780100#comment-17780100
 ] 

Philip Nee commented on KAFKA-15316:


I think this is fixed. I will check

> CommitRequestManager not calling RequestState callbacks 
> 
>
> Key: KAFKA-15316
> URL: https://issues.apache.org/jira/browse/KAFKA-15316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> CommitRequestManager is not triggering the RequestState callbacks that update 
> {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the 
> RequestState



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks

2023-10-26 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-15316:
--

Assignee: Philip Nee

> CommitRequestManager not calling RequestState callbacks 
> 
>
> Key: KAFKA-15316
> URL: https://issues.apache.org/jira/browse/KAFKA-15316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> CommitRequestManager is not triggering the RequestState callbacks that update 
> {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the 
> RequestState



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Labels: consumer-threading-refactor kip-848  (was: 
consumer-threading-refactor kip-848-preview)

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-848)

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14537:
URL: https://github.com/apache/kafka/pull/14537#discussion_r1373833449


##
core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala:
##
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetDelete(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetDelete(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+testOffsetDelete(false)
+  }
+
+  private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
+if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+  fail("Cannot use the new protocol with the old group coordinator.")
+}
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to 
ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) {
+
+  // Join the consumer group. Note that we don't heartbeat here so we must 
use
+  // a session long enough for the duration of the test.
+  val (memberId, memberEpoch) = joinConsumerGroup(
+groupId = "grp",
+useNewProtocol = useNewProtocol
+  )
+
+  // Commit offsets.
+  for (partitionId <- 0 to 2) {
+commitOffset(
+  groupId = "grp",
+  memberId = memberId,
+  memberEpoch = memberEpoch,
+  topic = "foo",
+  partition = partitionId,
+  offset = 100L + partitionId,
+  expectedError = Errors.NONE,
+  version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+)
+  }
+
+  // Delete offset with topic that the group is subscribed to.
+  deleteOffset(
+groupId = "grp",
+topic = "foo",
+partition = 0,
+

Re: [PR] KAFKA-15583: Enforce HWM advance only if partition is not under-min-ISR [kafka]

2023-10-26 Thread via GitHub


CalvinConfluent commented on code in PR #14594:
URL: https://github.com/apache/kafka/pull/14594#discussion_r1373832091


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1106,6 +1115,12 @@ class Partition(val topicPartition: TopicPartition,
* @return true if the HW was incremented, and false otherwise.
*/
   private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: 
Long = time.milliseconds): Boolean = {
+if (metadataCache.isInstanceOf[KRaftMetadataCache] && 
interBrokerProtocolVersion.isElrSupported && eligibleLeaderReplicasEnabled) {

Review Comment:
   Agree. Now make it a general requirement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-10-26 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1373831428


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -528,72 +483,64 @@ public void testRestartConnectorAndTasksNoStatus() throws 
Exception {
 ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
 assertTrue(ee.getCause() instanceof NotFoundException);
 assertTrue(ee.getMessage().contains("Status for connector"));
-PowerMock.verifyAll();
 }
 
 @Test
 public void testRestartConnectorAndTasksNoRestarts() throws Exception {
 RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-EasyMock.expect(herder.buildRestartPlan(restartRequest))
-.andReturn(Optional.of(restartPlan)).anyTimes();
-
-connector = PowerMock.createMock(BogusSinkConnector.class);
+RestartPlan restartPlan = mock(RestartPlan.class);
+ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+when(restartPlan.shouldRestartConnector()).thenReturn(false);
+when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+connector = mock(BogusSinkConnector.class);
 expectAdd(SourceSink.SINK);
 
 Map connectorConfig = connectorConfig(SourceSink.SINK);
-Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+Connector connectorMock = mock(SinkConnector.class);
 expectConfigValidation(connectorMock, true, connectorConfig);
 
-PowerMock.replayAll();
-
 herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
 }
 
 @Test
 public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
 RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-EasyMock.expect(herder.buildRestartPlan(restartRequest))
-.andReturn(Optional.of(restartPlan)).anyTimes();
+RestartPlan restartPlan = mock(RestartPlan.class);
+ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+when(restartPlan.shouldRestartConnector()).thenReturn(true);
+when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
 herder.onRestart(CONNECTOR_NAME);
-EasyMock.expectLastCall();
+verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, 
ConnectorStatus.State.RESTARTING, WORKER_ID, 0));

Review Comment:
   I think we can just get rid of this part? This isn't really relevant to this 
test case and it doesn't provide super strong guarantees (especially if we 
explicitly verify that we're emitting the `RESTARTING` state in the status 
store like suggested a few lines below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14537:
URL: https://github.com/apache/kafka/pull/14537#discussion_r1373831315


##
core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala:
##
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetDelete(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetDelete(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+testOffsetDelete(false)
+  }
+
+  private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
+if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+  fail("Cannot use the new protocol with the old group coordinator.")
+}
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to 
ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) {
+
+  // Join the consumer group. Note that we don't heartbeat here so we must 
use
+  // a session long enough for the duration of the test.
+  val (memberId, memberEpoch) = joinConsumerGroup(
+groupId = "grp",
+useNewProtocol = useNewProtocol
+  )
+
+  // Commit offsets.
+  for (partitionId <- 0 to 2) {
+commitOffset(
+  groupId = "grp",
+  memberId = memberId,
+  memberEpoch = memberEpoch,
+  topic = "foo",
+  partition = partitionId,
+  offset = 100L + partitionId,
+  expectedError = Errors.NONE,
+  version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+)
+  }
+
+  // Delete offset with topic that the group is subscribed to.
+  deleteOffset(
+groupId = "grp",
+topic = "foo",
+partition = 0,
+

Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-10-26 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1373830704


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -528,72 +483,64 @@ public void testRestartConnectorAndTasksNoStatus() throws 
Exception {
 ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
 assertTrue(ee.getCause() instanceof NotFoundException);
 assertTrue(ee.getMessage().contains("Status for connector"));
-PowerMock.verifyAll();
 }
 
 @Test
 public void testRestartConnectorAndTasksNoRestarts() throws Exception {
 RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-EasyMock.expect(herder.buildRestartPlan(restartRequest))
-.andReturn(Optional.of(restartPlan)).anyTimes();
-
-connector = PowerMock.createMock(BogusSinkConnector.class);
+RestartPlan restartPlan = mock(RestartPlan.class);
+ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+when(restartPlan.shouldRestartConnector()).thenReturn(false);
+when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+connector = mock(BogusSinkConnector.class);
 expectAdd(SourceSink.SINK);
 
 Map connectorConfig = connectorConfig(SourceSink.SINK);
-Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+Connector connectorMock = mock(SinkConnector.class);
 expectConfigValidation(connectorMock, true, connectorConfig);
 
-PowerMock.replayAll();
-
 herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
 }
 
 @Test
 public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
 RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-EasyMock.expect(herder.buildRestartPlan(restartRequest))
-.andReturn(Optional.of(restartPlan)).anyTimes();
+RestartPlan restartPlan = mock(RestartPlan.class);
+ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+when(restartPlan.shouldRestartConnector()).thenReturn(true);
+when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
 herder.onRestart(CONNECTOR_NAME);
-EasyMock.expectLastCall();
+verify(statusBackingStore).put(new ConnectorStatus(CONNECTOR_NAME, 
ConnectorStatus.State.RESTARTING, WORKER_ID, 0));
 
-connector = PowerMock.createMock(BogusSinkConnector.class);
+doNothing().when(herder).onRestart(CONNECTOR_NAME);

Review Comment:
   Instead of mocking this method on the class we're testing, can we let the 
real method be invoked and add a verification at the end of the test? It could 
be something like this:
   
   ```java
   ArgumentCaptor connectorStatus = 
ArgumentCaptor.forClass(ConnectorStatus.class);
   verify(statusBackingStore).put(connectorStatus.capture());
   assertEquals(CONNECTOR_NAME, connectorStatus.getValue().id());
   assertEquals(AbstractStatus.State.RESTARTING, 
connectorStatus.getValue().state());
   ```
   
   We can 

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Yes, I agree it should have a directory ID by this point, during the course 
of normal operation. By handling the no-ID case, I was trying to avoid unit 
tests failing. (To be clear, I haven't tested if they do fail, but I thought 
they might.) 
   
   Maybe we could remove this as a follow-on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373824387


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -267,35 +267,27 @@ class LogManager(logDirs: Seq[File],
   /**
* Retrieves the Uuid for the directory, given its absolute path.
*/
-  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+  def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)
 
   /**
* Determine directory ID for each directory with a meta.properties.
-   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
*/
-  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
-dirs.flatMap { dir =>
+  private def loadDirectoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+val result = mutable.HashMap[String, Uuid]()
+dirs.foreach(dir => {
   try {
-val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-metadataCheckpoint.read().map { props =>
-  val rawMetaProperties = new RawMetaProperties(props)
-  val uuid = rawMetaProperties.directoryId match {
-case Some(uuidStr) => Uuid.fromString(uuidStr)
-case None =>
-  val uuid = Uuid.randomUuid()
-  rawMetaProperties.directoryId = uuid.toString
-  metadataCheckpoint.write(rawMetaProperties.props)
-  uuid
-  }
-  dir.getAbsolutePath -> uuid
-}.toMap
+val props = PropertiesUtils.readPropertiesFile(
+  new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
+val metaProps = new MetaProperties.Builder(props).build()
+metaProps.directoryId().ifPresent(directoryId => {
+  result += (dir.getAbsolutePath -> directoryId)
+})

Review Comment:
   Yes, I agree it should have a directory ID by this point, during the course 
of normal operation. By handling the no-ID case, I was trying to avoid unit 
tests failing. Maybe we could remove this as a follow-on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
 if (directories.isEmpty) {
   throw new TerseFailure("No log directories found in the configuration.")
 }
-
-val unformattedDirectories = directories.filter(directory => {
-  if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-  true
-  } else if (!ignoreFormatted) {
-throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-  "Use --ignore-formatted to ignore this directory and format the 
others.")
-  } else {
-false
-  }
-})
-if (unformattedDirectories.isEmpty) {
+val loader = new MetaPropertiesEnsemble.Loader()
+directories.foreach(loader.addLogDir(_))
+val metaPropertiesEnsemble = loader.load()
+metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+  util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+  val firstLogDir = copier.logDirProps().keySet().iterator().next()
+  throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+"Use --ignore-formatted to ignore this directory and format the 
others.")
+}
+if (!copier.errorLogDirs().isEmpty) {
+  val firstLogDir = copier.errorLogDirs().iterator().next()
+  throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+}
+if (copier.emptyLogDirs().isEmpty) {
   stream.println("All of the log directories are already formatted.")
+} else {
+  copier.emptyLogDirs().forEach(logDir => {
+val newMetaProperties = new MetaProperties.Builder(metaProperties).
+  setDirectoryId(copier.generateValidDirectoryId()).
+  build()
+copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   It seems a bit cumbersome to put accessors on all the maps. But I'm open to 
ideas.
   
   I do wish Java had some way of flagging this map as different from the 
immutable ones. Kind of like `const` in C++, or yes, the whole menagerie of 
immtuable/mutable Scala classes. Although that comes with its own set of 
problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15691) Upgrade existing and add new system tests to use new consumer

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Summary: Upgrade existing and add new system tests to use new consumer  
(was: Upgrade existing and add new system tests to use new coordinator)

> Upgrade existing and add new system tests to use new consumer
> -
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373820141


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -228,23 +226,26 @@ class KafkaServer(
 info(s"Cluster ID = $clusterId")
 
 /* load metadata */
-val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
-  
BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, 
ignoreMissing = true, kraftMode = false)
-
-if (preloadedBrokerMetadataCheckpoint.version != 0) {
-  throw new RuntimeException(s"Found unexpected version in loaded 
`meta.properties`: " +
-s"$preloadedBrokerMetadataCheckpoint. Zk-based brokers only 
support version 0 " +
-"(which is implicit when the `version` field is missing).")
+val initialMetaPropsEnsemble = {
+  val loader = new MetaPropertiesEnsemble.Loader()
+  config.logDirs.foreach(loader.addLogDir(_))
+  loader.load()
 }

Review Comment:
   yes. good catch. @pprovenzano also found this bug through testing :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373816778


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogDirFailureChannel.java:
##
@@ -57,7 +56,7 @@ public boolean hasOfflineLogDir(String logDir) {
  * @param msg Error message.
  * @param e Exception instance.
  */
-public void maybeAddOfflineLogDir(String logDir, String msg, IOException 
e) {
+public void maybeAddOfflineLogDir(String logDir, String msg, Exception e) {

Review Comment:
   Fair enough. I changed it above to just catch IOE so I will change it here 
as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -135,39 +136,52 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are 
guaranteed to
* be consistent across all log dirs) and the offline directories
*/
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, 
BootstrapMetadata, Seq[String]) = {
-val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
-val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
-  getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, 
kraftMode = true)
-
-if (offlineDirs.contains(config.metadataLogDir)) {
-  throw new KafkaException("Cannot start server since `meta.properties` 
could not be " +
-s"loaded from ${config.metadataLogDir}")
+  def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, 
BootstrapMetadata) = {
+// Load and verify the original ensemble.
+val loader = new MetaPropertiesEnsemble.Loader()
+loader.addMetadataLogDir(config.metadataLogDir)
+config.logDirs.foreach(loader.addLogDir(_))
+val initialMetaPropsEnsemble = loader.load()
+initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => {
+  throw new RuntimeException(s"No `meta.properties` found in $logDir (have 
you run `kafka-storage.sh` " +
+"to format the directory?)")
+})
+val verificationFlags = if (config.migrationEnabled) {
+  util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
+} else {
+  util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID)
 }
+initialMetaPropsEnsemble.verify(Optional.empty(), 
OptionalInt.of(config.nodeId), verificationFlags);
 
+// Check that the __cluster_metadata-0 topic does not appear outside the 
metadata directory.
 val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
-val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ 
config.metadataLogDir)
-onlineNonMetadataDirs.foreach { logDir =>
-  val metadataDir = new File(logDir, metadataPartitionDirName)
-  if (metadataDir.exists) {
-throw new KafkaException(s"Found unexpected metadata location in data 
directory `$metadataDir` " +
-  s"(the configured metadata directory is ${config.metadataLogDir}).")
+initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
+  if (!logDir.equals(config.metadataLogDir)) {
+val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
+if (clusterMetadataTopic.exists) {
+  throw new KafkaException(s"Found unexpected metadata location in 
data directory `$clusterMetadataTopic` " +
+s"(the configured metadata directory is 
${config.metadataLogDir}).")
+}
   }
-}
-
-val metaProperties = MetaProperties.parse(rawMetaProperties)
-if (config.nodeId != metaProperties.nodeId) {
-  throw new InconsistentNodeIdException(
-s"Configured node.id `${config.nodeId}` doesn't match stored node.id 
`${metaProperties.nodeId}' in " +
-  "meta.properties. If you moved your data, make sure your configured 
controller.id matches. " +
-  "If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-}
+})
+
+// Set directory IDs on all directories. Rewrite the files if needed.

Review Comment:
   Yeah. Maybe eventually we'll also auto-upgrade from v0 -> v1 here (once not 
in migration mode any more)
   
   v0 is quite annoying since there's basically no required fields at all
   
   But one step at a time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -135,39 +136,52 @@ object KafkaRaftServer {
* @return A tuple containing the loaded meta properties (which are 
guaranteed to
* be consistent across all log dirs) and the offline directories
*/
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, 
BootstrapMetadata, Seq[String]) = {
-val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
-val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
-  getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, 
kraftMode = true)
-
-if (offlineDirs.contains(config.metadataLogDir)) {
-  throw new KafkaException("Cannot start server since `meta.properties` 
could not be " +
-s"loaded from ${config.metadataLogDir}")
+  def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, 
BootstrapMetadata) = {
+// Load and verify the original ensemble.
+val loader = new MetaPropertiesEnsemble.Loader()
+loader.addMetadataLogDir(config.metadataLogDir)
+config.logDirs.foreach(loader.addLogDir(_))
+val initialMetaPropsEnsemble = loader.load()
+initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => {
+  throw new RuntimeException(s"No `meta.properties` found in $logDir (have 
you run `kafka-storage.sh` " +
+"to format the directory?)")
+})
+val verificationFlags = if (config.migrationEnabled) {
+  util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
+} else {
+  util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID)
 }
+initialMetaPropsEnsemble.verify(Optional.empty(), 
OptionalInt.of(config.nodeId), verificationFlags);
 
+// Check that the __cluster_metadata-0 topic does not appear outside the 
metadata directory.
 val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
-val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ 
config.metadataLogDir)
-onlineNonMetadataDirs.foreach { logDir =>
-  val metadataDir = new File(logDir, metadataPartitionDirName)
-  if (metadataDir.exists) {
-throw new KafkaException(s"Found unexpected metadata location in data 
directory `$metadataDir` " +
-  s"(the configured metadata directory is ${config.metadataLogDir}).")
+initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
+  if (!logDir.equals(config.metadataLogDir)) {
+val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
+if (clusterMetadataTopic.exists) {
+  throw new KafkaException(s"Found unexpected metadata location in 
data directory `$clusterMetadataTopic` " +
+s"(the configured metadata directory is 
${config.metadataLogDir}).")
+}
   }
-}
-
-val metaProperties = MetaProperties.parse(rawMetaProperties)
-if (config.nodeId != metaProperties.nodeId) {
-  throw new InconsistentNodeIdException(
-s"Configured node.id `${config.nodeId}` doesn't match stored node.id 
`${metaProperties.nodeId}' in " +
-  "meta.properties. If you moved your data, make sure your configured 
controller.id matches. " +
-  "If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-}
+})
+
+// Set directory IDs on all directories. Rewrite the files if needed.

Review Comment:
   Yeah. Maybe eventually we'll upgrade from v0 -> v1 (if not in migration mode 
any more)
   
   v0 is quite annoying since there's basically no required fields at all
   
   But one step at a time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-26 Thread via GitHub


chb2ab commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1373815620


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -360,7 +360,9 @@ public short partitionRecordVersion() {
 }
 
 public short fetchRequestVersion() {
-if (this.isAtLeast(IBP_3_5_IV1)) {
+if (this.isAtLeast(IBP_3_7_IV0)) {

Review Comment:
   Oh, yeah we shouldn't need any other changes, the fields are all tagged and 
not getting used anywhere so we can leave them as their default values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373815484


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+/**
+ * The version of a meta.properties file.
+ */
+public enum MetaPropertiesVersion {
+V0(0),
+V1(1);
+
+private final int number;
+
+public static MetaPropertiesVersion fromNumberString(String numberString) {
+int number;
+try {
+number = Integer.parseInt(numberString.trim());
+} catch (NumberFormatException  e) {
+throw new RuntimeException("Invalid meta.properties version string 
'" +
+numberString + "'");
+}
+return fromNumber(number);
+}
+
+public static MetaPropertiesVersion fromNumber(int number) {
+switch (number) {
+case 0: return V0;
+case 1: return V1;
+default: throw new RuntimeException("Unknown meta.properties 
version number " + number);
+}
+}
+
+MetaPropertiesVersion(int number) {
+this.number = number;
+}
+
+public int number() {
+return number;
+}
+
+public String numberString() {
+return "" + number;
+}
+
+public boolean hasBrokerId() {
+return this == V0;
+}
+
+public boolean alwaysHasId() {

Review Comment:
   yes, let's rename it to `alwaysHasNodeId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-26 Thread via GitHub


chb2ab commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1373814111


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15.
+  "validVersions": "0-16",

Review Comment:
   ok, added the label



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373813203


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception {
 setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
 BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
 fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
+MetaPropertiesEnsemble metaPropertiesEnsemble = new 
MetaPropertiesEnsemble.Loader().
+setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION).
+addMetadataLogDir(node.metadataDirectory()).

Review Comment:
   Yes, this was left over from some earlier code, sorry. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373812595


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:
##
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+
+/**
+ * A collection of meta.properties information for Kafka log directories.
+ *
+ * Directories are categorized into empty, error, and normal. Each directory 
must appear in only
+ * one category, corresponding to emptyLogDirs, errorLogDirs, and logDirProps.
+ *
+ * This class is immutable. Modified copies can be made with the Copier class.
+ */
+public class MetaPropertiesEnsemble {
+/**
+ * The log4j object for this class.
+ */
+private static final Logger LOG = 
LoggerFactory.getLogger(MetaPropertiesEnsemble.class);
+
+/**
+ * A completely empty MetaPropertiesEnsemble object.
+ */
+public static final MetaPropertiesEnsemble EMPTY = new 
MetaPropertiesEnsemble(Collections.emptySet(),
+Collections.emptySet(),
+Collections.emptyMap(),
+Optional.empty());
+
+/**
+ * The name of the meta.properties file within each log directory.
+ */
+public static final String META_PROPERTIES_NAME = "meta.properties";
+
+/**
+ * The set of log dirs that were empty.
+ */
+private final Set emptyLogDirs;
+
+/**
+ * The set of log dirs that had errors.
+ */
+private final Set errorLogDirs;
+
+/**
+ * A map from log directories to the meta.properties information inside 
each one.
+ */
+private final Map logDirProps;
+
+/**
+ * The metadata log directory, or the empty string if there is none.
+ */
+private final Optional metadataLogDir;
+
+/**
+ * Utility class for loading a MetaPropertiesEnsemble from the disk.
+ */
+public static class Loader {
+private final TreeSet logDirs = new TreeSet<>();
+private Optional metadataLogDir = Optional.empty();
+
+public Loader addLogDirs(Collection logDirs) {
+for (String logDir : logDirs) {
+this.logDirs.add(logDir);
+}
+return this;
+}
+
+public Loader addLogDir(String logDir) {
+this.logDirs.add(logDir);
+return this;
+}
+
+public Loader addMetadataLogDir(String metadataLogDir) {
+if (this.metadataLogDir.isPresent()) {
+throw new RuntimeException("Cannot specify more than one 
metadata log directory. " +
+"Already specified " + this.metadataLogDir.get());
+}
+this.metadataLogDir = Optional.of(metadataLogDir);
+logDirs.add(metadataLogDir);
+return this;
+}
+
+public MetaPropertiesEnsemble load() throws IOException  {
+if (logDirs.isEmpty()) {
+throw new RuntimeException("You must specify at least one log 
directory.");
+}
+Set emptyLogDirs = new HashSet<>();
+Set errorLogDirs = new HashSet<>();
+Map logDirProps = new HashMap<>();
+for (String logDir : logDirs) {
+String metaPropsFile = new File(logDir, 
META_PROPERTIES_NAME).getAbsolutePath();
+try {
+Properties props = 
PropertiesUtils.readPropertiesFile(metaPropsFile);
+   

Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373810872


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   In some filesystems, if you don't fsync the parent directory, the file can 
be lost if the machine loses power. This is again something ext3 was famous 
for. Letting you sync files to disk, but then making them unreachable because 
you didn't sync the directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-26 Thread via GitHub


cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373808533


##
metadata/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+
+/**
+ * An immutable class which contains the per-log-directory information stored 
in an individual
+ * meta.properties file.
+ */
+public final class MetaProperties {

Review Comment:
   Hmm, good question. I guess I think it does belong here.
   
   The `:metadata` module in gradle is about more than just the controller. 
it's about how kafka handles metadata in general and has things like node 
registrations, metadata publishing and loading, etc. I think this is in keeping 
with that.
   
   Also, independently of all that, we should all be striving to shrink the 
`:core` module, not add more stuff there. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-26 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1373793336


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes. I'm having trouble speaking today. Map in scala is immutable. 臘‍♀️ 
Apologies for confusion.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes. I'm having trouble speaking today. Map in scala is immutable. 臘‍♀️ 
Apologies for confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15598 & KAFKA-15461: Add integration tests for DescribeGroups API, DeleteGroups API, OffsetDelete API and ListGroups API [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14537:
URL: https://github.com/apache/kafka/pull/14537#discussion_r1373787330


##
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+if (!isNewGroupCoordinatorEnabled) {
+  fail("Cannot use the new protocol with the old group coordinator.")
+}
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+  // Join the consumer group. Note that we don't heartbeat here so we must 
use
+  // a session long enough for the duration of the test.
+  val (memberId1, _) = joinConsumerGroup("grp", true)
+
+  checkListedGroups(
+groupId = "grp",
+state = ConsumerGroupState.STABLE.toString,
+statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+version = version
+  )
+
+  // Member 2 joins the group, triggering a rebalance.
+  val (memberId2, _) = joinConsumerGroup("grp", true)
+
+  checkListedGroups(
+groupId = "grp",
+state = ConsumerGroupState.RECONCILING.toString,
+  

Re: [PR] KAFKA-15583: Enforce HWM advance only if partition is not under-min-ISR With ELR [kafka]

2023-10-26 Thread via GitHub


artemlivshits commented on code in PR #14594:
URL: https://github.com/apache/kafka/pull/14594#discussion_r1373758559


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1106,6 +1115,12 @@ class Partition(val topicPartition: TopicPartition,
* @return true if the HW was incremented, and false otherwise.
*/
   private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: 
Long = time.milliseconds): Boolean = {
+if (metadataCache.isInstanceOf[KRaftMetadataCache] && 
interBrokerProtocolVersion.isElrSupported && eligibleLeaderReplicasEnabled) {

Review Comment:
   I have a higher-level question -- do we actually care if ELR is enabled for 
this logic?  It is my understanding that the logic on the brokers doesn't use 
ELR, it just has some changes that seem to improve the protocol even if ELR is 
not used / implemented:
   - strict min ISR
   - advance HWM only only if min ISR known to the controller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-26 Thread via GitHub


wcarlson5 merged PR #14575:
URL: https://github.com/apache/kafka/pull/14575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-26 Thread via GitHub


wcarlson5 commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1373750580


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+
+import java.util.Optional;
+
+/**
+ * The interface used by the `NetworkClient` to send telemetry requests.
+ */
+public interface ClientTelemetrySender extends AutoCloseable {
+
+/**
+ * Return the next time when the telemetry API should be attempted (i.e., 
interval time has elapsed).
+ * 
+ * If another telemetry API is in-flight, then {@code timeoutMs} should be 
returned as the
+ * maximum wait time.
+ *
+ * @param timeoutMs The timeout for the inflight telemetry API call.
+ * @return remaining time in ms till the telemetry API be attempted again.
+ */
+long timeToNextUpdate(long timeoutMs);
+
+/**
+ * Return the telemetry request based on client state i.e. determine if
+ * {@link 
org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or
+ * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be 
constructed.
+ *
+ * @return request for telemetry API call.
+ */
+Optional> createRequest();
+
+/**
+ * Handle successful response for get telemetry subscriptions request.
+ *
+ * @param response subscriptions telemetry API response
+ */
+void handleResponse(GetTelemetrySubscriptionsResponse response);
+
+/**
+ * Handle successful response for push telemetry request.
+ *
+ * @param response push telemetry API response
+ */
+void handleResponse(PushTelemetryResponse response);
+
+/**
+ * Handle response for failed get telemetry subscriptions request.
+ *
+ * @param kafkaException the fatal exception.
+ */
+void handleFailedGetTelemetrySubscriptionsRequest(KafkaException 
kafkaException);

Review Comment:
   I think this is fins as it is. The Kafka Exception should be broad enough



##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Value object that contains the name and tags for a Metric.
+ */
+public class MetricKey implements MetricKeyable {
+
+private final String name;
+private final Map tags;
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ */
+public MetricKey(String name) {
+this(name, null);
+}
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ * @param tags mapping of tag 

Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


alok123t commented on PR #14607:
URL: https://github.com/apache/kafka/pull/14607#issuecomment-1781801313

   @junrao I think the test failures should be unrelated to the PR from a quick 
look - not sure if these are known flaky tests, I will wait for another run 
from the latest commit 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-26 Thread via GitHub


alok123t commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1373738648


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+
+private final int version;
+private final Uuid topicId;
+
+public PartitionMetadata(int version, Uuid topicId) {
+this.version = version;
+this.topicId = topicId;
+}
+
+public int version() {
+return version;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String toText() {

Review Comment:
   updated to encode in 
https://github.com/apache/kafka/pull/14607/commits/f2d05e64314401142d1386f031b45b42425d5a93
   
   It's better to use the `Formatter` interface in `LeaderEpochCheckpointFile`, 
we can do in a follow up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-26 Thread via GitHub


apoorvmittal10 commented on PR #14621:
URL: https://github.com/apache/kafka/pull/14621#issuecomment-1781794070

   > @apoorvmittal10 : Yes, we are gradually moving the codebase to java. 
Ideally, all new classes should be written in java.
   
   Thanks @junrao, I ll update the PR by tomorrow with new classes in Java.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15574) Update states and transitions for membership manager state machine

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15574:
--
Description: 
This task is to update the state machine so that it correctly acts as the glue 
between the heartbeat request manager and the assignment reconciler.

The state machine will transition from one state to another as a response to 
heartbeats, callback completion, errors, unsubscribing, and other external 
events. A given transition may kick off one or more actions that are 
implemented outside of the state machine.

Steps:
 # Update the set of states in the code as [defined in the diagrams on the 
wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
 # Ensure the correct state transitions are performed as responses to external 
input
 # _Define_ any actions that should be taken as a result of the above 
transitions, but defer the _implementation_ to separate Jiras/PRs as much as 
possible

  was:
This task includes:
 # Updating the states in the code as [defined in the diagrams on the 
wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
 # Implement the state transitions as responses to heartbeat responses, 
callbacks, unsubscribe, etc.
 # _Define_ the actions that should be taken as a result of the transitions; 
defer the _implementation_ of as many actions as reasonable possible to 
separate Jiras/PRs 

The state machine will work to tie the heartbeat request manager and the 
partition assignment reconciler together.  is to call the partition assignment 
reconciler from the heartbeat request manager, making sure to correctly query 
the state machine for the right actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occur.

All driven by the HB manager.


> Update states and transitions for membership manager state machine
> --
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to update the state machine so that it correctly acts as the 
> glue between the heartbeat request manager and the assignment reconciler.
> The state machine will transition from one state to another as a response to 
> heartbeats, callback completion, errors, unsubscribing, and other external 
> events. A given transition may kick off one or more actions that are 
> implemented outside of the state machine.
> Steps:
>  # Update the set of states in the code as [defined in the diagrams on the 
> wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
>  # Ensure the correct state transitions are performed as responses to 
> external input
>  # _Define_ any actions that should be taken as a result of the above 
> transitions, but defer the _implementation_ to separate Jiras/PRs as much as 
> possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15574) Update states and transitions for membership manager state machine

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15574:
--
Summary: Update states and transitions for membership manager state machine 
 (was: Implement heartbeat membership manager state machine)

> Update states and transitions for membership manager state machine
> --
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task includes:
>  # Updating the states in the code as [defined in the diagrams on the 
> wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
>  # Implement the state transitions as responses to heartbeat responses, 
> callbacks, unsubscribe, etc.
>  # _Define_ the actions that should be taken as a result of the transitions; 
> defer the _implementation_ of as many actions as reasonable possible to 
> separate Jiras/PRs 
> The state machine will work to tie the heartbeat request manager and the 
> partition assignment reconciler together.  is to call the partition 
> assignment reconciler from the heartbeat request manager, making sure to 
> correctly query the state machine for the right actions.
>  
> The HB-reconciler interaction is 2 folded:
>  * HB should send HB req when the reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occur.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15574) Implement heartbeat membership manager state machine

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15574:
--
Description: 
This task includes:
 # Updating the states in the code as [defined in the diagrams on the 
wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
 # Implement the state transitions as responses to heartbeat responses, 
callbacks, unsubscribe, etc.
 # _Define_ the actions that should be taken as a result of the transitions; 
defer the _implementation_ of as many actions as reasonable possible to 
separate Jiras/PRs 

The state machine will work to tie the heartbeat request manager and the 
partition assignment reconciler together.  is to call the partition assignment 
reconciler from the heartbeat request manager, making sure to correctly query 
the state machine for the right actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occur.

All driven by the HB manager.

  was:
This task is to call the partition assignment reconciler from the heartbeat 
request manager, making sure to correctly query the state machine for the right 
actions.

 
The HB-reconciler interaction is 2 folded:
 * HB should send HB req when the reconciler completes callbacks
 * HB manager needs to trigger the reconciler to release assignments when 
errors occur.

All driven by the HB manager.


> Implement heartbeat membership manager state machine
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task includes:
>  # Updating the states in the code as [defined in the diagrams on the 
> wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
>  # Implement the state transitions as responses to heartbeat responses, 
> callbacks, unsubscribe, etc.
>  # _Define_ the actions that should be taken as a result of the transitions; 
> defer the _implementation_ of as many actions as reasonable possible to 
> separate Jiras/PRs 
> The state machine will work to tie the heartbeat request manager and the 
> partition assignment reconciler together.  is to call the partition 
> assignment reconciler from the heartbeat request manager, making sure to 
> correctly query the state machine for the right actions.
>  
> The HB-reconciler interaction is 2 folded:
>  * HB should send HB req when the reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occur.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15574) Implement heartbeat membership manager state machine

2023-10-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15574:
--
Summary: Implement heartbeat membership manager state machine  (was: 
Integrate partition assignment reconciliation with heartbeat request manager)

> Implement heartbeat membership manager state machine
> 
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to call the partition assignment reconciler from the heartbeat 
> request manager, making sure to correctly query the state machine for the 
> right actions.
>  
> The HB-reconciler interaction is 2 folded:
>  * HB should send HB req when the reconciler completes callbacks
>  * HB manager needs to trigger the reconciler to release assignments when 
> errors occur.
> All driven by the HB manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >