Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-08 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1556834149


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) 
efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+final Trie trie = new Trie();
+
+MultiFieldPaths(Set paths) {
+paths.forEach(trie::insert);
+}
+
+public static MultiFieldPaths of(List fields, FieldSyntaxVersion 
syntaxVersion) {
+return new MultiFieldPaths(fields.stream()
+.map(f -> new SingleFieldPath(f, syntaxVersion))
+.collect(Collectors.toSet()));
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param struct data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Struct struct) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(struct, trie.root, new HashMap<>());
+}
+
+private Map> findFieldAndValues(
+Struct originalValue,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Field field = originalValue.schema().field(step.getKey());
+if (step.getValue().isLeaf()) {
+Map.Entry fieldAndValue =
+field != null
+? new AbstractMap.SimpleImmutableEntry<>(field, 
originalValue.get(field))
+: null;
+fieldAndValueMap.put(step.getValue().path, fieldAndValue);
+} else {
+if (field.schema().type() == Type.STRUCT) {
+findFieldAndValues(
+originalValue.getStruct(field.name()),
+step.getValue(),
+fieldAndValueMap
+);
+}
+}
+}
+return fieldAndValueMap;
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param value data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Map value) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(value, trie.root, new HashMap<>());
+}
+
+@SuppressWarnings("unchecked")
+private Map> findFieldAndValues(
+Map value,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Object fieldValue = value.get(step.getKey());
+if (step.getValue().isLeaf()) {
+fieldAndValueMap.put(
+step.getValue().path,
+new AbstractMap.SimpleImmutableEntry<>(step.getKey(), 
fieldValue)
+);
+} else {
+if (fieldValue instanceof Map) {
+findFieldAndValues(
+(Map) fieldValue,
+step.getValue(),
+fieldAndValueMap
+  

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-08 Thread via GitHub


C0urante commented on PR #15379:
URL: https://github.com/apache/kafka/pull/15379#issuecomment-2044091861

   I'm also wondering if it's necessary to have the `SingleFieldPath` class at 
all. Would it be significantly more expensive to just use the `MultiFieldPaths` 
class for everything for now? It'd reduce the complexity of the implementation 
and probably make review easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-08 Thread via GitHub


C0urante commented on PR #15379:
URL: https://github.com/apache/kafka/pull/15379#issuecomment-2044089149

   @jeqo This is difficult to review without seeing how this code is actually 
used. Maybe we could move incrementally and introduce a commit that only 
touches on 1-3 SMTs, and only introduces the internal changes (i.e., methods in 
the `SingleFieldPath` and `MultiFieldPaths` classes) necessary in order to 
touch on those SMTs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should wait, the real timeout for heartBeat should be sessionTimeout, so we can 
set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   time.sleep(sessionTimeoutMs - 1);
   return 

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should send, but the real timeout for heartBeat should be sessionTimeout, so we 
can set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   time.sleep(sessionTimeoutMs - 1);
   return 

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should send, but the real timeout for heartBeat should be sessionTimeout, so we 
can set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   System.out.println("!!! poll");
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   

[jira] [Resolved] (KAFKA-16455) Check partition exists before send reassignments to server in ReassignPartitionsCommand

2024-04-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16455.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Check partition exists before send reassignments to server in 
> ReassignPartitionsCommand
> ---
>
> Key: KAFKA-16455
> URL: https://issues.apache.org/jira/browse/KAFKA-16455
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.8.0
>
>
> Currently, when executing {{kafka-reassign-partitions.sh}} with the 
> {{--execute}} option, if a partition number specified in the JSON file does 
> not exist, this check occurs only when submitting the reassignments to 
> {{alterPartitionReassignments}} on the server-side.
> We can perform this check in advance before submitting the reassignments to 
> the server side.
> For example, suppose we have three brokers with IDs 1001, 1002, and 1003, and 
> a topic named {{first_topic}} with only three partitions. And execute 
> {code:bash}
> bin/kafka-reassign-partitions.sh 
>   --bootstrap-server 192.168.0.128:9092 
>   --reassignment-json-file reassignment.json 
>   --execute
> {code}
> Where reassignment.json contains
> {code:json}
> {
>   "version": 1,
>   "partitions": [
> {
>   "topic": "first_topic",
>   "partition": 20,
>   "replicas": [1002, 1001, 1003],
>   "log_dirs": ["any", "any", "any"]
> }
>   ]
> }
> {code}
> The console outputs
> {code:java}
> Current partition replica assignment
> {"version":1,"partitions":[]}
> Save this to use as the --reassignment-json-file option during rollback
> Error reassigning partition(s):
> first_topic-20: The partition does not exist.
> {code}
> Apart from the output {{\{"version":1,"partitions":[]\}}} which doesn't 
> provide much help, the error {{first_topic-20: The partition does not 
> exist.}} is reported back to the tool from the server-side, as mentioned 
> earlier. This check could be moved earlier before sending reassignments to 
> server side



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-08 Thread via GitHub


showuon merged PR #15659:
URL: https://github.com/apache/kafka/pull/15659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-08 Thread via GitHub


showuon commented on PR #15659:
URL: https://github.com/apache/kafka/pull/15659#issuecomment-2043887866

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556614475


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -109,7 +109,7 @@ class AddPartitionsToTxnManager(
 .setTransactionalId(transactionalId)
 .setProducerId(producerId)
 .setProducerEpoch(producerEpoch)
-.setVerifyOnly(true)
+.setVerifyOnly(supportedOperation != addPartition)

Review Comment:
   nit -- we should also add the supportedOperation in KafkaApis and/or the 
other files based on the request version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1556593959


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Hmm, when will we set HWM to be lower than _localLogStartOffset?
   
   In `UnifiedLog.deletableSegments()`, we have the following code that bounds 
the retention based deletion by highWatermark. When updating highWatermark, the 
value typically increases.
   `val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556609642


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   We should also be checking the TV on the various requests and making sure we 
check the epoch when we update the cluster's latest TV.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens immediately is to restart a broker) Thus, we took this 
strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn 

Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded -- in this case, we can still handle the old request and 
we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
   
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
   CompletableFuture.completedFuture[Unit](())
+} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+  // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is false. 
   
   1) TV is downgraded -- in this case, we can still handle the old request and 
we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]

2024-04-08 Thread via GitHub


pasharik commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2043855618

   > any updates here? gradle 8.7 has been released. we can update gradle to 
8.7 if the issue is nonexistent
   
   Update to 8.7 is ok with me. For now, I'm compiling my test with `scalac` as 
a workaround anyway


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556584820


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -979,6 +1013,13 @@ void handleCoordinatorReady() {
 null;
 this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
 initProducerIdVersion.maxVersion() >= 3;
+
+// TODO(caliu) use feature version.
+ApiVersion produceVersion = nodeApiVersions != null ?
+nodeApiVersions.apiVersion(ApiKeys.PRODUCE) :
+null;
+this.coordinatorSupportsTransactionV2 = produceVersion != null &&

Review Comment:
   I think the other thing we are looking for is in the txn offset commit 
request and the produce request, setting the version to 11 and 4 until 
v2/feature version is enabled. I didn't see that here yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556576980


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -402,6 +412,30 @@ public synchronized void maybeAddPartition(TopicPartition 
topicPartition) {
 }
 }
 
+public synchronized void maybeHandlePartitionAdded(TopicPartition 
topicPartition) {

Review Comment:
   For my understanding, this was the previous handling for the add partition 
call, but since add partition is now part of the produce request, we have a 
separate block?
   
   I wonder if we should change some of these error messages to reflect that 
the record was not written either?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No Review] Kafka-14563 [kafka]

2024-04-08 Thread via GitHub


jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556556192


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -202,7 +202,9 @@ public enum MetadataVersion {
 IBP_3_7_IV4(19, "3.7", "IV4", false),
 
 // Add ELR related supports (KIP-966).
-IBP_3_8_IV0(20, "3.8", "IV0", true);
+IBP_3_8_IV0(20, "3.8", "IV0", true),
+
+IBP_100_1_IV0(100, "100.1", "IV0", false);

Review Comment:
   hehe this is a unique way to have a placeholder for feature version :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-08 Thread via GitHub


jolshan opened a new pull request, #15685:
URL: https://github.com/apache/kafka/pull/15685

   As part of KIP-1022, I have created an interface for all the new features to 
be used when parsing the command line arguments, doing validations, getting 
default versions, etc. 
   
   I've also added the `--feature` flag to the storage tool to show how it will 
be used. 
   
   Created a TestFeatureVersion to show an implementation of the interface 
(besides MetadataVersion which is unique) and added tests using this new test 
feature.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043792883

   @chia7712 : Thanks for the updated PR. The code looks good to me. There were 
50 failed tests. Is any of them related to the PR? If not, have they all been 
tracked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]

2024-04-08 Thread via GitHub


junrao commented on PR #15680:
URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043781090

   @apoorvmittal10 : Thanks for the PR. The code looks good to me. Have all the 
test failures been tracked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556505842


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 
3;
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = 
"The replication factor for the transaction topic (set higher to ensure 
availability). " +
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = 
"transaction.state.log.min.isr";
+public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " 
+ 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
 + " config for the transaction topic.";

Review Comment:
   this is a good point. I rephrase it in a way that doesn't relay on referring 
to `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556496340


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {

Review Comment:
   > Why do we need this? we usually don't add it if the config class just to 
group the props, docs and default values.
   
   that is used to avoid creating instance, and that is a guideline of 
`Effective Java book`
   
   anyway, that is just a code style suggestion, so it is fine to ignore it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 commented on PR #15680:
URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043749614

   > @apoorvmittal10 Haven’t we already released those metrics in 3.7?
   
   @dajac There were some work items left for completion for 3.8. These broker 
metrics were not part of 3.7 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556491908


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 
3;
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = 
"The replication factor for the transaction topic (set higher to ensure 
availability). " +
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = 
"transaction.state.log.min.isr";
+public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " 
+ 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
 + " config for the transaction topic.";

Review Comment:
   > If we want to remove the deps on TopicConfig we can hard code the value of 
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG in doc but I personally would prefer 
that we keep the ref to the config.
   
   not sure whether we ought to highlight the "override". the other similar 
configs, for example `offsets.topic.replication.factor`, do not mention that 
"this" config overrides "that". Instead, it just says `The replication factor 
for the offsets topic ...`, and that is good enough to understand the purpose 
of config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556491449


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -88,10 +125,10 @@ public OffsetConfig(int maxMetadataSize,
 }
 
 public OffsetConfig() {

Review Comment:
   deleted 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556490595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,53 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
-public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
-public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 
1000L;
-public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 
60L;
-public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
-public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 
1024;
-public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
-public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE 
= CompressionType.NONE;
-public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
-public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = 
"offset.metadata.max.bytes";
+public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
+public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum 
size for a metadata entry associated with an offset commit.";
+
+public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = 
"offsets.load.buffer.size";
+public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
+public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for 
reading from the offsets segments when loading offsets into the cache 
(soft-limit, overridden if records are too large).";
+
+public static final String OFFSETS_RETENTION_MINUTES_CONFIG = 
"offsets.retention.minutes";
+public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
+public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed 
consumers, committed offset of a specific partition will be expired and 
discarded when 1) this retention period has elapsed after the consumer group 
loses all its consumers (i.e. becomes empty); " +
+"2) this retention period has elapsed since the last time an 
offset is committed for the partition and the group is no longer subscribed to 
the corresponding topic. " +
+"For standalone consumers (using manual assignment), offsets will 
be expired after this retention period has elapsed since the time of last 
commit. " +
+"Note that when a group is deleted via the delete-group request, 
its committed offsets will also be deleted without extra retention period; " +
+"also when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's committed offsets for that topic will 
also be deleted without extra retention period.";
+
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = 
"offsets.retention.check.interval.ms";
+public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 
60L;
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = 
"Frequency at which to check for stale offsets";
+
+public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = 
"offsets.topic.num.partitions";
+public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of 
partitions for the offset commit topic (should not change after deployment).";
+
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = 
"offsets.topic.segment.bytes";
+public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 
1024;
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets 
topic segment bytes should be kept relatively small in order to facilitate 
faster log compaction and cache loads.";
+
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"offsets.topic.replication.factor";
+public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The 
replication factor for the offsets topic (set higher to ensure availability). " 
+
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = 
"offsets.topic.compression.codec";
+public static final CompressionType 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = 
"Compression codec for the offsets topic - compression may be used to achieve 
\"atomic\" commits.";
+
+public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = 
"offsets.commit.timeout.ms";
+public static 

Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556482611


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 
3;
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = 
"The replication factor for the transaction topic (set higher to ensure 
availability). " +
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = 
"transaction.state.log.min.isr";
+public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " 
+ 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
 + " config for the transaction topic.";
+
+public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG = 
"transaction.state.log.load.buffer.size";
+public static final int TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 
1024;
+public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_DOC = "Batch size 
for reading from the transaction log segments when loading producer ids and 
transactions into the cache (soft-limit, overridden if records are too large).";
+
+public static final String 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG = 
ProducerStateManagerConfig.TRANSACTION_VERIFICATION_ENABLED;
+public static final boolean 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT = true;
+public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC = 
"Enable verification that checks that the partition has been added to the 
transaction before writing transactional records to the partition";
+
+public static final String PRODUCER_ID_EXPIRATION_MS_CONFIG = 
ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS;

Review Comment:
   moved TRANSACTION_VERIFICATION_ENABLED and PRODUCER_ID_EXPIRATION_MS to 
TransactionLogConfig where we have the docs and default values 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {

Review Comment:
   Adding "s" to the name make sense but seems like we don't have one pattern 
in Kafka for example there are SaslConfigs and TopicConfig. I'll rename it but 
we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to 
add some guidance for future refactors 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556481647


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {

Review Comment:
   > add private constructor
   
   Why do we need this? we usually don't add it if the config class just to 
group the props, docs and default values. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {

Review Comment:
   Adding "s" to the name make sense but seems like we don't have one pattern 
in Kafka for example there is SaslConfigs and TopicConfig. I'll rename it but 
we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to 
add some guidance for future refactors 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-04-08 Thread via GitHub


junrao commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2043718730

   @clolov : Thanks for the PR. When we last bumped up metadata.version 
(https://github.com/apache/kafka/pull/14984), we changed a bunch of tests such 
as MetadataVersionTest, ZkMigrationIntegrationTest, etc to reference 3.8-IV0. 
Have we added 3.8-IV1 to all needed tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556466418


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 
3;
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = 
"The replication factor for the transaction topic (set higher to ensure 
availability). " +
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = 
"transaction.state.log.min.isr";
+public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " 
+ 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
 + " config for the transaction topic.";

Review Comment:
   We maybe don't need `ServerTopicConfigSynonyms::serverSynonym` as it 
actually call 
`ServerTopicConfigSynonyms.sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)` 
which is same as if we just referred to 
`TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` directly. 
   
   If we want to remove the deps on `TopicConfig` we can hard code the value of 
`TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` in doc but I personally would prefer 
that we keep the ref to the config. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556449383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -17,13 +17,84 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.common.utils.Utils;

Review Comment:
   It seems we don't have strict import orders, but it would be nice to 
optimize the imports :)



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,53 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
-public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
-public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 
1000L;
-public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 
60L;
-public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
-public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 
1024;
-public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
-public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE 
= CompressionType.NONE;
-public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
-public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = 
"offset.metadata.max.bytes";
+public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
+public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum 
size for a metadata entry associated with an offset commit.";
+
+public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = 
"offsets.load.buffer.size";
+public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
+public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for 
reading from the offsets segments when loading offsets into the cache 
(soft-limit, overridden if records are too large).";
+
+public static final String OFFSETS_RETENTION_MINUTES_CONFIG = 
"offsets.retention.minutes";
+public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
+public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed 
consumers, committed offset of a specific partition will be expired and 
discarded when 1) this retention period has elapsed after the consumer group 
loses all its consumers (i.e. becomes empty); " +
+"2) this retention period has elapsed since the last time an 
offset is committed for the partition and the group is no longer subscribed to 
the corresponding topic. " +
+"For standalone consumers (using manual assignment), offsets will 
be expired after this retention period has elapsed since the time of last 
commit. " +
+"Note that when a group is deleted via the delete-group request, 
its committed offsets will also be deleted without extra retention period; " +
+"also when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's committed offsets for that topic will 
also be deleted without extra retention period.";
+
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = 
"offsets.retention.check.interval.ms";
+public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 
60L;
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = 
"Frequency at which to check for stale offsets";
+
+public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = 
"offsets.topic.num.partitions";
+public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of 
partitions for the offset commit topic (should not change after deployment).";
+
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = 
"offsets.topic.segment.bytes";
+public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 
1024;
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets 
topic segment bytes should be kept relatively small in order to facilitate 
faster log compaction and cache loads.";
+
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"offsets.topic.replication.factor";
+public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The 
replication factor for the offsets topic (set higher to ensure availability). " 
+
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = 

Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2043666756

   any updates here? gradle 8.7 has been released. we can update gradle to 8.7 
if the issue is nonexistent


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16477) Detect thread leaked client-metrics-reaper in tests

2024-04-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16477.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Detect thread leaked client-metrics-reaper in tests
> ---
>
> Key: KAFKA-16477
> URL: https://issues.apache.org/jira/browse/KAFKA-16477
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> After profiling the kafka tests, tons of `client-metrics-reaper` thread not 
> cleanup after BrokerServer shutdown.
> The thread {{client-metrics-reaper}} comes from 
> [ClientMetricsManager#expirationTimer|https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115],
>  and BrokerServer#shudown doesn't close ClientMetricsManager which let the 
> timer thread still runs in background.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]

2024-04-08 Thread via GitHub


chia7712 merged PR #15668:
URL: https://github.com/apache/kafka/pull/15668


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1556404908


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {

Review Comment:
   In order to make it be a pure constants class, could you please add 
following changes?
   
   1. rename it to `TransactionLogConfigs`?
   2. add `final`
   3. add private constructor



##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 
3;
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = 
"The replication factor for the transaction topic (set higher to ensure 
availability). " +
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = 
"transaction.state.log.min.isr";
+public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2;
+public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " 
+ 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
 + " config for the transaction topic.";

Review Comment:
   Maybe we can revise the docs to avoid depending on those modules directly?



##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -16,11 +16,43 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
 public class TransactionLogConfig {
 // Log-level config default values
-public static final int DEFAULT_NUM_PARTITIONS = 50;
-public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
-public static final short DEFAULT_REPLICATION_FACTOR = 3;
-public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
+public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number 
of partitions for the transaction topic (should not change after deployment).";
+
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = 
"transaction.state.log.segment.bytes";
+public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 
1024 * 1024;
+public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The 
transaction topic segment bytes should be kept relatively small in order to 
facilitate faster log compaction and cache loads";
+
+public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"transaction.state.log.replication.factor";
+

[PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM opened a new pull request, #15684:
URL: https://github.com/apache/kafka/pull/15684

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15678:
URL: https://github.com/apache/kafka/pull/15678#discussion_r1556390789


##
server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java:
##
@@ -172,10 +172,11 @@ public void testAssignmentAggregation() throws 
InterruptedException {
 manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, () -> { 
});
 manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, () -> { 
});
 manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, () -> { 
});
-while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+TestUtils.waitForCondition(() -> {
 time.sleep(100);

Review Comment:
   IIRC, `TestUtils.waitForConditio` includes the `sleep`, so you don't need to 
call sleep manually



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1556382498


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,19 +48,40 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) { // Constructor injections
+this.clusterInstance = clusterInstance;
+}
+
 String[] getArgs(String group, String topic) {
 return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", clusterInstance.bootstrapServers(),
 "--delete-offsets",
 "--group", group,
 "--topic", topic
 };
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] 
args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+
+return new ConsumerGroupCommand.ConsumerGroupService(
+opts,
+Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE))
+);
+}
+
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),

Review Comment:
   Could we define `ClusterTestDefaults` at class-level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1556371500


##
server/src/main/java/org/apache/kafka/server/config/KafkaSecurityConfigs.java:
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Common home for broker-side security configs which need to be accessible 
from the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaSecurityConfigs {
+
+/** * SSL Configuration /
+public final static String SSL_PROTOCOL_CONFIG = 
SslConfigs.SSL_PROTOCOL_CONFIG;
+public final static String SSL_PROTOCOL_DOC = SslConfigs.SSL_PROTOCOL_DOC;
+public static final String SSL_PROTOCOL_DEFAULT = 
SslConfigs.DEFAULT_SSL_PROTOCOL;
+
+public final static String SSL_PROVIDER_CONFIG = 
SslConfigs.SSL_PROVIDER_CONFIG;
+public final static String SSL_PROVIDER_DOC = SslConfigs.SSL_PROVIDER_DOC;
+
+public final static String SSL_CIPHER_SUITES_CONFIG = 
SslConfigs.SSL_CIPHER_SUITES_CONFIG;
+public final static String SSL_CIPHER_SUITES_DOC = 
SslConfigs.SSL_CIPHER_SUITES_DOC;
+
+public final static String SSL_ENABLED_PROTOCOLS_CONFIG = 
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG;
+public final static String SSL_ENABLED_PROTOCOLS_DOC = 
SslConfigs.SSL_ENABLED_PROTOCOLS_DOC;
+public static final String SSL_ENABLED_PROTOCOLS_DEFAULTS = 
SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+
+public final static String SSL_KEYSTORE_TYPE_CONFIG = 
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
+public final static String SSL_KEYSTORE_TYPE_DOC = 
SslConfigs.SSL_KEYSTORE_TYPE_DOC;
+public static final String SSL_KEYSTORE_TYPE_DEFAULT = 
SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+
+public final static String SSL_KEYSTORE_LOCATION_CONFIG = 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+public final static String SSL_KEYSTORE_LOCATION_DOC = 
SslConfigs.SSL_KEYSTORE_LOCATION_DOC;
+
+public final static String SSL_KEYSTORE_PASSWORD_CONFIG = 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+public final static String SSL_KEYSTORE_PASSWORD_DOC = 
SslConfigs.SSL_KEYSTORE_PASSWORD_DOC;
+
+public final static String SSL_KEY_PASSWORD_CONFIG = 
SslConfigs.SSL_KEY_PASSWORD_CONFIG;
+public final static String SSL_KEY_PASSWORD_DOC = 
SslConfigs.SSL_KEY_PASSWORD_DOC;
+
+public final static String SSL_KEYSTORE_KEY_CONFIG = 
SslConfigs.SSL_KEYSTORE_KEY_CONFIG;
+public final static String SSL_KEYSTORE_KEY_DOC = 
SslConfigs.SSL_KEYSTORE_KEY_DOC;
+
+public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = 
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG;
+public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = 
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC;
+public final static String SSL_TRUSTSTORE_TYPE_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
+public final static String SSL_TRUSTSTORE_TYPE_DOC = 
SslConfigs.SSL_TRUSTSTORE_TYPE_DOC;
+public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = 
SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+
+public final static String SSL_TRUSTSTORE_LOCATION_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+public final static String SSL_TRUSTSTORE_PASSWORD_DOC = 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC;
+
+public final static String SSL_TRUSTSTORE_PASSWORD_CONFIG = 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+public final static String SSL_TRUSTSTORE_LOCATION_DOC = 
SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC;
+
+public final static String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = 

Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2043536012

   @nizhikov thanks for updated PR. I have a major question: Does 
`ConfigCommandIntegrationTest` have only zk-related tests? If so, we don't need 
to rewrite it by java as it will be removed directly. For another, it seems 
that we don't have integration test for broker configs? Do you have free cycles 
to take over it? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1556357170


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   I rename it `KafkaSecurityConfigs` and left a comment that it's a central 
place for Kafka security configs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #15631:
URL: https://github.com/apache/kafka/pull/15631#discussion_r1556331481


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -952,8 +952,9 @@ class UnifiedLogTest {
 assertEquals(0, lastSeq)
   }
 
-  @Test
-  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+  @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with 
empty-active-segment: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testRetentionDeletesProducerStateSnapshots(isEmptyActiveSegment: 
Boolean): Unit = {

Review Comment:
   isEmptyActiveSegment => createEmptyActiveSegment  and change the test name 
accordingly.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1532,10 +1532,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 localLog.checkIfMemoryMappedBufferClosed()
-// remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+if (segmentsToDelete.nonEmpty) {
+  // increment the local-log-start-offset before removing the segment 
for lookups

Review Comment:
   We probably should say "increment local-log-start-offset or 
log-start-offset" since `incrementStartOffset()` could update either one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KIP-848 system tests 2024-04-08 [kafka]

2024-04-08 Thread via GitHub


kirktrue opened a new pull request, #15683:
URL: https://github.com/apache/kafka/pull/15683

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1556226896


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   > I think better approach would be moving default values to SslConfigs, 
SaslConfigs ,SecurityConfig and BrokerSecurityConfigs WDYT?
   
   agreed
   
   > KafkaConfig while it is an anti-pattern it has been acting as one place 
where we can find all KafkaConfig.
   
   got it. I can buy the purpose. BTW, could we rename it to KafkaConfig"s" and 
make it be a final class since we don't want to have instance of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]

2024-04-08 Thread via GitHub


dajac commented on PR #15680:
URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043344035

   @apoorvmittal10 Haven’t we already released those metrics in 3.7?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16482) Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach

2024-04-08 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835005#comment-17835005
 ] 

Chia-Ping Tsai commented on KAFKA-16482:


I'm a big confused that should we encourage developers to set configs for all 
test case by modifying `ClusterConfig` in `BeforeEach` phase? it seems to me 
that we allow to inject `ClusterConfig` in everywhere, and so it is hard to be 
keenly aware of "Do those changes have effect on the cluster"?

Personally, I prefer to set configs for all test cases by `ClusterTestDefaults` 
(this function is not implemented). And individual configs of test case can be 
changed by `ClusterTest`. 

I'd want to disable to inject `ClusterConfig`. However, maybe there are some 
test cases that they need a lot of "if-else" to define the configs rather than 
just pass "constants".

[~davidarthur] WDYT?

> Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach
> ---
>
> Key: KAFKA-16482
> URL: https://issues.apache.org/jira/browse/KAFKA-16482
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Cheng-Kai, Zhang
>Priority: Major
>
> IDE does not like the code style, and we can leverage `ClusterConfigProperty` 
> to eliminate the false error from IDE
>  
> [https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala#L42]
> [https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java#L75]
>  
> https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java#L68



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043274125

   @junrao thanks for reviews. both comments get addressed in 
https://github.com/apache/kafka/pull/15621/commits/581242c1fa6c005bf91a7ced96932774c2c02cd9


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1556161125


##
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##
@@ -72,18 +72,20 @@ public CheckpointFile(File file,
 tempPath = Paths.get(absolutePath + ".tmp");
 }
 
-public void write(Collection entries) throws IOException {
+public void write(Collection entries, boolean sync) throws IOException {
 synchronized (lock) {
 // write to temp file and then swap with the existing file
 try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
  BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
 CheckpointWriteBuffer checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);
 checkpointWriteBuffer.write(entries);
 writer.flush();
-fileOutputStream.getFD().sync();
+if (sync) {
+fileOutputStream.getFD().sync();

Review Comment:
   @ocadaruma : I realized a potential issue with this change. The issue is 
that if sync is false, we don't force a flush to disk. However, the OS could 
flush partial content of the leader epoch file. If the broker has a hard 
failure, the leader epoch file could be corrupted. In the recovery path, since 
we always expect the leader epoch file to be well-formed, a corrupted leader 
epoch file will fail the recovery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556146632


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556142547


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556133511


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
-if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
-data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-sentFields.subscribedTopicNames = subscribedTopicNames;
-}
-} else {
-// SubscribedTopicRegex - only sent if it has changed since 
the last heartbeat
-//  - not supported yet
+// SubscribedTopicNames - only sent if has changed since the last 
heartbeat
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;

Review Comment:
   Nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2043179367

   @cadonna could you take a look at this one when you have a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1556068446


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -56,11 +60,33 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListMaxTimestampWithEmptyLog(quorum: String): Unit = {
+val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topicName)
+assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
maxTimestampOffset.offset())
+assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
maxTimestampOffset.timestamp())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testListVersion0(quorum: String): Unit = {
+// create records for version 0
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0)
+produceMessagesInSeparateBatch()
+
+// update version to version 1 to list offset for max timestamp
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)
+// the offset of max timestamp is always -1 if the batch version is 0
+verifyListOffsets(expectedMaxTimestampOffset = -1)
+

Review Comment:
   extra new line



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -123,7 +149,7 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   @ParameterizedTest
   @ValueSource(strings = Array("zk"))
   def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): 
Unit = {
-createOldMessageFormatBrokers()
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)

Review Comment:
   > // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
   > // So in this separate batch test, it'll be the last offset 2
   The comment in line 159 is not very accurate. Since we advance the time for 
each batch, the maxTimestampOffset is the message in the last batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556104853


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]

2024-04-08 Thread via GitHub


lianetm commented on PR #15669:
URL: https://github.com/apache/kafka/pull/15669#issuecomment-2043121631

   Hey @cadonna, could you take a look if you have some time too? Thanks!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16474:
--

Assignee: Lianet Magrans  (was: Philip Nee)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-08 Thread via GitHub


brandboat commented on code in PR #15659:
URL: https://github.com/apache/kafka/pull/15659#discussion_r1556052135


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -300,6 +301,15 @@ public void testGetReplicaAssignments() throws Exception {
 
 assertEquals(assignments,
 getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0);
+
+assignments.clear();

Review Comment:
   You're right, already removed it in the latest commit, many thanks :smiley: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]

2024-04-08 Thread via GitHub


brandboat commented on code in PR #15668:
URL: https://github.com/apache/kafka/pull/15668#discussion_r1556044180


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin 
receiverPlugin, int clie
 this.subscriptionMap = new ConcurrentHashMap<>();
 this.subscriptionUpdateVersion = new AtomicInteger(0);
 this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CACHE_MAX_SIZE));
-this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", 
new SystemTimer("client-metrics"));
+this.expirationTimer = new 
SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new 
SystemTimer("client-metrics"));

Review Comment:
   Sure, addressed in 
https://github.com/apache/kafka/pull/15668/commits/2da40c26541e727f2cd38bb9fb64ee41e5ca42c2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-08 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-16478.

Resolution: Fixed

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Assignee: Mickael Maison
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2042958530

   Hey @Phuc-Hong-Tran, thanks for the update, left some more comments. Almost 
there! Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555969469


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1751,16 +1753,7 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 commented on PR #15681:
URL: https://github.com/apache/kafka/pull/15681#issuecomment-2042866833

   > Tiny nit. It's KIP-1019 I think, not KIP-109.
   
   Corrected the description. Thanks @AndrewJSchofield.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]

2024-04-08 Thread via GitHub


AndrewJSchofield commented on PR #15681:
URL: https://github.com/apache/kafka/pull/15681#issuecomment-2042864722

   Tiny nit. It's KIP-1019 I think, not KIP-109.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555915666


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1669,7 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+maybeUpdateSubscriptionMetadata();
 backgroundEventProcessor.process();
 
 // Keeping this updateAssignmentMetadataIfNeeded wrapping up the 
updateFetchPositions as

Review Comment:
   Since we're here, and adding logic that adds to the purpose of this 
`updateAssignmentMetadataIfNeeded`, we could clean up and remove this outdated 
comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 commented on PR #15682:
URL: https://github.com/apache/kafka/pull/15682#issuecomment-2042864619

   The build is dependent on merge of PR: 
https://github.com/apache/kafka/pull/15681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 opened a new pull request, #15682:
URL: https://github.com/apache/kafka/pull/15682

   The PR implements the changes defined in KIP-1019. Does the cleanup for 
accessing KafkaMetric field by reflection and uses method exposed by KIP-1019 
for metric measurability.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 opened a new pull request, #15681:
URL: https://github.com/apache/kafka/pull/15681

   The PR implements the changes defined in 
[KIP-109](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability)
 which exposes method to check if metric is of type Measurable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555909363


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -39,9 +39,8 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
* metadata refresh the consumer becomes subscribed to this new topic and 
all partitions
* of that topic are assigned to it.
*/
-  // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))

Review Comment:
   If we don't use the 
`getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly` function 
anymore we should remove its definition from this file now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1555905789


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +520,124 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";

Review Comment:
   @junrao thanks for the details. I have opened the PR to address same: 
https://github.com/apache/kafka/pull/15680
   cc: @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 opened a new pull request, #15680:
URL: https://github.com/apache/kafka/pull/15680

   The PR updates the broker metrics name to kebab/hyphen case as pointed out 
by @junrao in below comment: 
https://github.com/apache/kafka/pull/15251#discussion_r1498439741
   
   I have also removed the redundant `client-metrics-` prefix in all metrics as 
the group name in `client-metrics` itself.
   
   Once merged I ll update the KIP accordingly.
   
   - jconsole details:
   
   https://github.com/apache/kafka/assets/2861565/f70d72b7-22ad-49ee-95e8-3898623817f1;>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-08 Thread via GitHub


FrankYang0529 opened a new pull request, #15679:
URL: https://github.com/apache/kafka/pull/15679

   By using ClusterTestExtensions, 
`DeleteOffsetsConsumerGroupCommandIntegrationTest` get away from 
`KafkaServerTestHarness` dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can rename it to `KafkaSecurityConfig` as a common ground between the two 
approaches. Having central place for all security configs and break out of 
KafkaConfig anti-pattern. I don't mind either way. 
   
   It just might be confusing as we have already `SecurityConfig` and 
`BrokerSecurityConfigs` we just might need to be mindful about naming the new 
one  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can rename it to `KafkaSecurityConfig` as a common ground between the two 
approaches. Having central place for all security configs and break out of 
KafkaConfig anti-pattern. I don't mind either way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can rename it to `KafkaSecurityConfig` as a common ground between the two 
approaches. Having central place for all security configs and break out of 
KafkaConfig anti-pattern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can, but we already have `SslConfigs`, `SaslConfigs`, `SecurityConfig` 
and `BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has 
been acting as one place where we can find all KafkaConfig. 
   
   I think better approach would be moving default values to `SslConfigs`, 
`SaslConfigs` ,`SecurityConfig` and `BrokerSecurityConfigs` WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can, but we already have `SslConfigs`, `SecurityConfig` and 
`BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has been 
acting as one place where we can find all KafkaConfig. 
   
   I think better approach would be moving default values to `SslConfigs`, 
`SecurityConfig` and `BrokerSecurityConfigs` WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   We can, but we already have `SslConfigs`, `SecurityConfig` and 
`BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has been 
acting as one place where we can find all KafkaConfig. 
   
   I think better approach would be moving docs and default values to 
`SslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs` WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-08 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16486:
-

 Summary: Integrate metric measurability changes in metrics 
collector
 Key: KAFKA-16486
 URL: https://issues.apache.org/jira/browse/KAFKA-16486
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case

2024-04-08 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-16485:
--
Parent: KAFKA-15601
Issue Type: Sub-task  (was: Improvement)

> Fix broker metrics to follow kebab/hyphen case
> --
>
> Key: KAFKA-16485
> URL: https://issues.apache.org/jira/browse/KAFKA-16485
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case

2024-04-08 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16485:
-

 Summary: Fix broker metrics to follow kebab/hyphen case
 Key: KAFKA-16485
 URL: https://issues.apache.org/jira/browse/KAFKA-16485
 Project: Kafka
  Issue Type: Improvement
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-08 Thread via GitHub


chiacyu opened a new pull request, #15678:
URL: https://github.com/apache/kafka/pull/15678

   We should replace the while loop in some test cases with waitForCondition to 
prevent infinite looping conditions.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16481: Fixing flaky test kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics [kafka]

2024-04-08 Thread via GitHub


showuon merged PR #15677:
URL: https://github.com/apache/kafka/pull/15677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16481: Fixing flaky test kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics [kafka]

2024-04-08 Thread via GitHub


showuon commented on PR #15677:
URL: https://github.com/apache/kafka/pull/15677#issuecomment-2042592569

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2042586830

   Will check it this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2042555906

   > Look like the build still contains failed test :(
   
   yep, I have filed another #15654 to dig in that :_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-08 Thread via GitHub


Owen-CH-Leung commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2042547986

   > rebase to trigger QA again
   
   Look like the build still contains failed test :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-08 Thread via GitHub


cadonna commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1555453187


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -293,17 +288,12 @@ public void cleanup() throws IOException {
 task = null;
 }
 Utils.delete(BASE_DIR);
-mockito.finishMocking();
 }
 
 @Test
-public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws 
IOException {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
-EasyMock.expectLastCall();
-EasyMock.replay(stateDirectory, stateManager);
+public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
+stateDirectory = mock(StateDirectory.class);

Review Comment:
   I am wondering whether we leak resources, if we assign a mock to 
`stateDirectory` without closing the state directory before.
   In `setup()` an actual state directory is created with 
   ```
   stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), 
true, false);  
   ```
   It has been there before this PR, but we should fix it.
   
   You can either close the state directory before the mock is assigned or 
remove the creation of the state directory from `setup()` and create it in each 
test method that uses it. 
   
   Same is true for the other occurrences in this test class.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1912,42 +1823,32 @@ public void shouldThrowIfPostCommittingOnIllegalState() 
{
 
 @Test
 public void shouldSkipCheckpointingSuspendedCreatedTask() {
-stateManager.checkpoint();
-EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
-
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-EasyMock.replay(stateManager, recordCollector);
-
 task = createStatefulTask(createConfig("100"), true);
 task.suspend();
 task.postCommit(true);
+
+verify(stateManager, never()).checkpoint();
 }
 
 @Test
 public void shouldCheckpointForSuspendedTask() {
-stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(singletonMap(partition1, 1L));
-
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-EasyMock.replay(stateManager, recordCollector);
+when(stateManager.changelogOffsets())
+.thenReturn(singletonMap(partition1, 1L));

Review Comment:
   nit:
   ```suggestion
   
when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L));
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -411,25 +384,22 @@ public void seek(final TopicPartition partition, final 
long offset) {
 
 shouldNotSeek.set(new AssertionError("Should not seek"));
 
+@SuppressWarnings("unchecked")
 final java.util.function.Consumer> resetter =
-EasyMock.mock(java.util.function.Consumer.class);
-resetter.accept(Collections.singleton(partition1));
-EasyMock.expectLastCall();
-EasyMock.replay(resetter);
+mock(java.util.function.Consumer.class);
+doNothing().when(resetter).accept(Collections.singleton(partition1));

Review Comment:
   This should be a verification. However, there is an issue here. If I add it 
to the verifications with 
   ```java
   verify(resetter).accept(Collections.singleton(partition1));
   ```
   the test fails.
   The reason is that when ` accept()` is called, the argument is indeed 
`Collections.singleton(partition1)` but after the call the collection is 
cleared:
   
   
https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L955
   
   At the time the call is verified the argument changed. Apparently, Mockito 
stores the reference to the argument in the invocation.
   
   One way to solve this is the following:
   
   ```java
   final java.util.function.Consumer> resetter =
   mock(java.util.function.Consumer.class);
   final Set partitionsAtCall = new HashSet<>();
   doAnswer(
   invocation -> {
   partitionsAtCall.addAll(invocation.getArgument(0));
   return null;
   }
   ).when(resetter).accept(Collections.singleton(partition1));
   
   task.initializeIfNeeded();
   task.completeRestoration(resetter);
   
   // because we mocked the 

Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1555661766


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1062,21 +1027,21 @@ object KafkaConfig {
   .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, 
in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc)
 
   /** * Transaction management configuration ***/
-  .define(TransactionalIdExpirationMsProp, INT, 
Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, 
TransactionalIdExpirationMsDoc)
-  .define(TransactionsMaxTimeoutMsProp, INT, 
Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, 
TransactionsMaxTimeoutMsDoc)
-  .define(TransactionsTopicMinISRProp, INT, 
Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, 
TransactionsTopicMinISRDoc)
-  .define(TransactionsLoadBufferSizeProp, INT, 
Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, 
TransactionsLoadBufferSizeDoc)
-  .define(TransactionsTopicReplicationFactorProp, SHORT, 
Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, 
TransactionsTopicReplicationFactorDoc)
-  .define(TransactionsTopicPartitionsProp, INT, 
Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, 
TransactionsTopicPartitionsDoc)
-  .define(TransactionsTopicSegmentBytesProp, INT, 
Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, 
TransactionsTopicSegmentBytesDoc)
-  .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsAbortTimedOutTransactionsIntervalMsDoc)
-  .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsRemoveExpiredTransactionsIntervalMsDoc)
-
-  .define(TransactionPartitionVerificationEnableProp, BOOLEAN, 
Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, 
TransactionPartitionVerificationEnableDoc)
-
-  .define(ProducerIdExpirationMsProp, INT, 
Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+  
.define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, 
INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, 
TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)

Review Comment:
   Good call, pushed a refactor for this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-08 Thread via GitHub


lucasbru merged PR #15525:
URL: https://github.com/apache/kafka/pull/15525


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-04-08 Thread via GitHub


clolov commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2042214344

   Yup, I need to change the kafka-get-offsets tool to easily access said 
functionality, but I am in the process of raising that PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-08 Thread via GitHub


nizhikov commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2042116296

   Hello @chia7712 
   
   > we can complete it in another PR before this PR.
   
   `junit-platform.properties` added for core and tools modules.
   Can you, please, review this test refactoring?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-08 Thread Philipp Trulson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834821#comment-17834821
 ] 

Philipp Trulson commented on KAFKA-16478:
-

Thanks for the PR! Unfortunately it was too late in my TZ already, LGTM :) 

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Assignee: Mickael Maison
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15659:
URL: https://github.com/apache/kafka/pull/15659#discussion_r1555327569


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -300,6 +301,15 @@ public void testGetReplicaAssignments() throws Exception {
 
 assertEquals(assignments,
 getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0);
+
+assignments.clear();

Review Comment:
   Sorry, what's the purpose we explicitly clear `assignments` here? I think 
this is only used as local scope variable, we should not clear it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1555305760


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  if (this.promotionStates.containsKey(topicPartition)) {
+val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+// Revert any reassignments for partitions that did not complete the 
future replica promotion
+if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+}
+this.promotionStates.remove(topicPartition)
+  }

Review Comment:
   In your opinion, if what circumstance, the `promotionStates` will not 
contain the topic partition? Maybe the `removePartitions` got called multiple 
times? I'm thinking, because before this change, we'll send out 
`AssignReplicasToDirsRequest` no matter what, but now, we will skip when no 
`PromotionState`, will that cause any potential problem? Maybe when upgrading? 
(Is it possible?) Should we still invoke 
`directoryEventHandler.handleAssignment` even if no `PromotionState`? I think 
no, but I'd like to hear your thought here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-08 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1555264597


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+public class ConnectorOffsetBackingStoreTest {
+
+private static final String NAMESPACE = "namespace";
+// Connect format - any types should be accepted here
+private static final Map OFFSET_KEY = 
Collections.singletonMap("key", "key");
+private static final Map OFFSET_VALUE = 
Collections.singletonMap("key", 12);
+
+// Serialized
+private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+
+private static final Exception PRODUCE_EXCEPTION = new KafkaException();
+
+private final Converter keyConverter = mock(Converter.class);
+private final Converter valueConverter = mock(Converter.class);

Review Comment:
   We tend to use the `@Mock` annotation instead of final fields:
   
   ```suggestion
   @Mock
   private Converter keyConverter;
   @Mock
   private Converter valueConverter;
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import