Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]

2024-06-12 Thread via GitHub


showuon commented on code in PR #16319:
URL: https://github.com/apache/kafka/pull/16319#discussion_r1637590625


##
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java:
##
@@ -113,16 +113,26 @@ public void testGetHost() {
 assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
 assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080"));
 assertEquals("My_Domain.com", 
getHost("PLAINTEXT://My_Domain.com:8080"));
+assertEquals("mydomain.com", 
getHost("SASL_PLAINTEXT://mydomain.com:8080"));
+assertEquals("MyDomain.com", 
getHost("SASL_PLAINTEXT://MyDomain.com:8080"));
+assertEquals("My_Domain.com", 
getHost("SASL_PLAINTEXT://My_Domain.com:8080"));
 assertEquals("::1", getHost("[::1]:1234"));
 assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", 
getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
 assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", 
getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678"));
 assertEquals("fe80::b1da:69ca:57f7:63d8%3", 
getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678"));
+assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", 
getHost("SASL_PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));

Review Comment:
   @frankvicky , we have 4 kinds of security protocol, let's test all of them 
to make sure we don't break anything. I think we can use `ParameterizedTest` 
here:
   
   ```
   @ParameterizedTest
   @CsvSource(value = {"PLANTEXT", "SASL_PLAINTEXT", ...)
   ```
   
   ref: 
https://kafka.apache.org/documentation/#brokerconfigs_security.inter.broker.protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]

2024-06-12 Thread via GitHub


showuon commented on code in PR #16319:
URL: https://github.com/apache/kafka/pull/16319#discussion_r1637590625


##
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java:
##
@@ -113,16 +113,26 @@ public void testGetHost() {
 assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
 assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080"));
 assertEquals("My_Domain.com", 
getHost("PLAINTEXT://My_Domain.com:8080"));
+assertEquals("mydomain.com", 
getHost("SASL_PLAINTEXT://mydomain.com:8080"));
+assertEquals("MyDomain.com", 
getHost("SASL_PLAINTEXT://MyDomain.com:8080"));
+assertEquals("My_Domain.com", 
getHost("SASL_PLAINTEXT://My_Domain.com:8080"));
 assertEquals("::1", getHost("[::1]:1234"));
 assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", 
getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
 assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", 
getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678"));
 assertEquals("fe80::b1da:69ca:57f7:63d8%3", 
getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678"));
+assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", 
getHost("SASL_PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));

Review Comment:
   @frankvicky , we have 4 kinds of security protocol, let's test all of them. 
I think we can use `ParameterizedTest` here:
   
   ```
   @ParameterizedTest
   @CsvSource(value = {"PLANTEXT", "SASL_PLAINTEXT", ...)
   ```
   
   ref: 
https://kafka.apache.org/documentation/#brokerconfigs_security.inter.broker.protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637579381


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param lastOffsetToCompare The last offset of the request batch.
+ *
+ * @return True if the in-flight batch is a full match with the request 
offsets, false otherwise.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private boolean checkForFullMatch(InFlightBatch inFlightBatch, long 

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637574165


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -54,6 +60,7 @@ public class SharePartition {
  * The state of the records determines if the records should be 
re-delivered, move the next fetch
  * offset, or be state persisted to disk.
  */
+// Visible for testing
 public enum RecordState {

Review Comment:
   Can we make it `default` instead of `public`



##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param 

Re: [PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]

2024-06-12 Thread via GitHub


kamalcph commented on PR #16321:
URL: https://github.com/apache/kafka/pull/16321#issuecomment-2164410837

   @dopuskh3 
   Call for review. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]

2024-06-12 Thread via GitHub


kamalcph opened a new pull request, #16321:
URL: https://github.com/apache/kafka/pull/16321

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16948) Reset tier lag metrics on becoming follower

2024-06-12 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16948:


 Summary: Reset tier lag metrics on becoming follower
 Key: KAFKA-16948
 URL: https://issues.apache.org/jira/browse/KAFKA-16948
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


Tier lag metrics such as remoteCopyLagBytes and remoteCopyLagSegments are not 
cleared sometimes when the node transitions from leader to follower post a 
rolling restart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16747: Implemented share sessions and contexts for share fetch requests [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16263:
URL: https://github.com/apache/kafka/pull/16263#discussion_r1637556905


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -0,0 +1,916 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
+import org.apache.kafka.common.errors.ShareSessionNotFoundException;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.requests.ShareFetchMetadata;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareSessionCache;
+import org.apache.kafka.server.share.ShareSessionKey;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@Timeout(120)
+public class SharePartitionManagerTest {
+
+private static final int RECORD_LOCK_DURATION_MS = 3;
+private static final int MAX_DELIVERY_COUNT = 5;
+private static final short MAX_IN_FLIGHT_MESSAGES = 200;
+
+private final List emptyPartList = 
Collections.unmodifiableList(new ArrayList<>());
+
+@Test
+public void testNewContextReturnsFinalContext() {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder().build();
+
+ShareFetchMetadata newReqMetadata = new 
ShareFetchMetadata(Uuid.ZERO_UUID, -1);
+ShareFetchContext shareFetchContext = 
sharePartitionManager.newContext("grp", new HashMap<>(), new ArrayList<>(), 
newReqMetadata);
+assertEquals(shareFetchContext.getClass(), 
SharePartitionManager.FinalContext.class);
+
+// If the final fetch request has topics to add, it should fail as an 
invalid request
+Uuid topicId = Uuid.randomUuid();
+Map 
shareFetchData = new HashMap<>();
+shareFetchData.put(new TopicIdPartition(topicId, new 
TopicPartition("foo", 0)),
+new ShareFetchRequest.SharePartitionData(topicId, 4000));
+assertThrows(InvalidRequestException.class,
+() -> sharePartitionManager.newContext("grp", shareFetchData, 
new ArrayList<>(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1)));
+
+// shareFetchData is not empty, but the maxBytes of topic partition is 
0, which means this is added only for acknowledgements.
+// New context should be created successfully
+shareFetchData.clear();
+shareFetchData.put(new TopicIdPartition(topicId, new 
TopicPartition("foo", 0)),
+new ShareFetchRequest.SharePartitionData(topicId, 0));
+shareFetchContext = sharePartitionManager.newContext("grp", 
shareFetchData, new ArrayList<>(), newReqMetadata);
+assertEquals(shareFetchContext.getClass(), 
SharePartitionManager.FinalContext.class);
+}
+
+@Test
+public void testNewContext() {

Review Comment:
   Yes, I can only segregate out one of the cases from the following test 
because for others I need them to work one after 

Re: [PR] KAFKA-16747: Implemented share sessions and contexts for share fetch requests [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16263:
URL: https://github.com/apache/kafka/pull/16263#discussion_r1637549886


##
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.FetchSession;
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ShareSessionNotFoundException;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ShareFetchMetadata;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.ShareSession;
+import org.apache.kafka.server.share.ShareSessionCache;
+import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.storage.internals.log.FetchParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The SharePartitionManager is responsible for managing the SharePartitions 
and ShareSessions.
+ * It is responsible for fetching messages from the log and acknowledging the 
messages.
+ */
+public class SharePartitionManager implements AutoCloseable {
+
+private final static Logger log = 
LoggerFactory.getLogger(SharePartitionManager.class);
+
+/**
+ * The partition cache map is used to store the SharePartition objects for 
each share group topic-partition.
+ */
+private final Map partitionCacheMap;
+
+/**
+ * The replica manager is used to fetch messages from the log.
+ */
+private final ReplicaManager replicaManager;
+
+/**
+ * The time instance is used to get the current time.
+ */
+private final Time time;
+
+/**
+ * The share session cache stores the share sessions.
+ */
+private final ShareSessionCache cache;
+
+/**
+ * The fetch queue stores the share fetch requests that are waiting to be 
processed.
+ */
+private final ConcurrentLinkedQueue fetchQueue;
+
+/**
+ * The process fetch queue lock is used to ensure that only one thread is 
processing the fetch queue at a time.
+ */
+private final AtomicBoolean processFetchQueueLock;
+
+/**
+ * The record lock duration is the time in milliseconds that a record lock 
is held for.
+ */
+private final int recordLockDurationMs;
+
+/**
+ * The max in flight messages is the maximum number of messages that can 
be in flight at any one time per share-partition.
+ */
+private final int maxInFlightMessages;
+
+/**
+ * The max delivery count is the maximum number of times a message can be 
delivered before it is considered to be archived.
+ */
+private final int maxDeliveryCount;
+
+public SharePartitionManager(
+ReplicaManager replicaManager,
+Time time,
+ShareSessionCache cache,
+int recordLockDurationMs,
+int maxDeliveryCount,
+int maxInFlightMessages
+) {
+this(replicaManager, time, cache, new ConcurrentHashMap<>(), 
recordLockDurationMs, maxDeliveryCount, maxInFlightMessages);
+}
+
+SharePartitionManager(
+ReplicaManager replicaManager,
+Time time,
+ShareSessionCache cache,
+Map partitionCacheMap,
+int recordLockDurationMs,
+ 

Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164374450

   @gharris1727 Thank you very much for your suggestion! I have made another 
commit, resolving the issue mentioned above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 24) Correct stateful task flagging [kafka]

2024-06-12 Thread via GitHub


apourchet closed pull request #16314: KAFKA-15045: (KIP-924 pt. 24) Correct 
stateful task flagging
URL: https://github.com/apache/kafka/pull/16314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] Delayed remote list offsets [kafka]

2024-06-12 Thread via GitHub


kamalcph opened a new pull request, #16320:
URL: https://github.com/apache/kafka/pull/16320

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic [kafka]

2024-06-12 Thread via GitHub


C0urante merged PR #16306:
URL: https://github.com/apache/kafka/pull/16306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-06-12 Thread via GitHub


github-actions[bot] commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-2164295049

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]

2024-06-12 Thread via GitHub


showuon commented on PR #16211:
URL: https://github.com/apache/kafka/pull/16211#issuecomment-2164292176

   @chia7712 , thanks for the review. PR updated in this commit: 
https://github.com/apache/kafka/pull/16211/commits/7db45c2e8f08b4a381717b4c6b7f466bd116cd4e
 . Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]

2024-06-12 Thread via GitHub


showuon commented on code in PR #16211:
URL: https://github.com/apache/kafka/pull/16211#discussion_r1637455856


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -126,8 +127,13 @@ public long timeUntilCheckQuorumExpires(long 
currentTimeMs) {
  */
 public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) 
{
 updateFetchedVoters(id);
-// The majority number of the voters excluding the leader. Ex: 3 
voters, the value will be 1
-int majority = voterStates.size() / 2;
+// The majority number of the voters. Ex: 3 voters, the value will be 2
+int majority = (int) ((double) (voterStates.size() + 1) / 2);

Review Comment:
   Ah, good catch! My test didn't work as expected. Also updated the test to 
catch the issue. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]

2024-06-12 Thread via GitHub


showuon commented on code in PR #16211:
URL: https://github.com/apache/kafka/pull/16211#discussion_r1637455387


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -111,9 +111,10 @@ public long timeUntilCheckQuorumExpires(long 
currentTimeMs) {
 long remainingMs = checkQuorumTimer.remainingMs();
 if (remainingMs == 0) {
 log.info(
-"Did not receive fetch request from the majority of the voters 
within {}ms. Current fetched voters are {}.",
+"Did not receive fetch request from the majority of the voters 
within {}ms. Current fetched voters are {}, and voters are {}",
 checkQuorumTimeoutMs,
-fetchedVoters);
+fetchedVoters,
+voterStates);

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12480) Reuse bootstrap servers in clients when last alive broker in cluster metadata is unavailable

2024-06-12 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854611#comment-17854611
 ] 

Ivan Yurchenko commented on KAFKA-12480:


With 
[KIP-899|https://cwiki.apache.org/confluence/display/KAFKA/KIP-899:+Allow+producer+and+consumer+clients+to+rebootstrap]
 accepted and [KAFKA-8206|https://issues.apache.org/jira/browse/KAFKA-8206] 
implemented, please consider resolving this jira.

> Reuse bootstrap servers in clients when last alive broker in cluster metadata 
> is unavailable
> 
>
> Key: KAFKA-12480
> URL: https://issues.apache.org/jira/browse/KAFKA-12480
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ron Dagostino
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-12455 documented how a Java 
> client can temporarily lose connectivity to a 2-broker cluster that is 
> undergoing a roll because the client will repeatedly retry connecting to the 
> last alive broker that it knows about in the cluster metadata even when that 
> broker is unavailable.  The client could potentially fallback to its 
> bootstrap brokers in this case and reconnect to the cluster quicker.
> For example, assume a 2-broker cluster has broker IDs 1 and 2 and both appear 
> in the bootstrap servers for a consumer.  Assume broker 1 rolls such that the 
> Java consumer receives a new METADATA response and only knows about broker 2 
> being alive, and then broker 2 rolls before the consumer gets a new METADATA 
> response indicating that broker 1 is also alive.  At this point the Java 
> consumer will keep retrying broker 2, and it will not reconnect to the 
> cluster unless/until broker 2 becomes available -- or the client itself is 
> restarted so it can use its bootstrap servers again.  Another possibility is 
> to fallback to the full bootstrap servers list when the last alive broker 
> becomes unavailable.
> I believe librdkafka-based client may perform this fallback, though I am not 
> certain.  We should consider it for Java clients.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13467) Clients never refresh cached bootstrap IPs

2024-06-12 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854612#comment-17854612
 ] 

Ivan Yurchenko commented on KAFKA-13467:


With 
[KIP-899|https://cwiki.apache.org/confluence/display/KAFKA/KIP-899:+Allow+producer+and+consumer+clients+to+rebootstrap]
 accepted and [KAFKA-8206|https://issues.apache.org/jira/browse/KAFKA-8206] 
implemented, please consider resolving this jira.

> Clients never refresh cached bootstrap IPs
> --
>
> Key: KAFKA-13467
> URL: https://issues.apache.org/jira/browse/KAFKA-13467
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, network
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Follow up ticket to https://issues.apache.org/jira/browse/KAFKA-13405.
> For certain broker rolling upgrade scenarios, it would be beneficial to 
> expired cached bootstrap server IP addresses and re-resolve those IPs to 
> allow clients to re-connect to the cluster without the need to restart the 
> client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node

2024-06-12 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854610#comment-17854610
 ] 

TengYao Chi commented on KAFKA-16781:
-

Hi [~tinaselenge] feel free to take this if you wish :)

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: TengYao Chi
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API

2024-06-12 Thread dujian0068 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dujian0068 reassigned KAFKA-14507:
--

Assignee: dujian0068

> Add ConsumerGroupPrepareAssignment API
> --
>
> Key: KAFKA-14507
> URL: https://issues.apache.org/jira/browse/KAFKA-14507
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: dujian0068
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14705) Remove deprecated options and redirections

2024-06-12 Thread dujian0068 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dujian0068 reassigned KAFKA-14705:
--

Assignee: dujian0068

> Remove deprecated options and redirections
> --
>
> Key: KAFKA-14705
> URL: https://issues.apache.org/jira/browse/KAFKA-14705
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Federico Valeri
>Assignee: dujian0068
>Priority: Major
> Fix For: 4.0.0
>
>
> We can use this task to track tools cleanup for the next major release 
> (4.0.0).
> 1. Redirections to be removed:
>  - core/src/main/scala/kafka/tools/JmxTool
>  - core/src/main/scala/kafka/tools/ClusterTool
>  - core/src/main/scala/kafka/tools/StateChangeLogMerger
>  - core/src/main/scala/kafka/tools/EndToEndLatency
>  - core/src/main/scala/kafka/admin/FeatureCommand
>  - core/src/main/scala/kafka/tools/StreamsResetter
> 2. Deprecated tools to be removed:
>  - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger
> 3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" 
> should be moved to "tools" once we get rid of MirrorMaker1 dependency.
> 4. We should also get rid of many deprecated options across all tools, 
> including not migrated tools.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16941) Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.admin.ConfigCommandIntegrationTest

2024-06-12 Thread Lin Siyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854604#comment-17854604
 ] 

Lin Siyuan commented on KAFKA-16941:


I'm a newbie and I will do my best to find the problem, it can take a long 
time, if others can find the problem quickly, you are welcome to join in.

> Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, 
> MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
> kafka.admin.ConfigCommandIntegrationTest
> --
>
> Key: KAFKA-16941
> URL: https://issues.apache.org/jira/browse/KAFKA-16941
> Project: Kafka
>  Issue Type: Test
>Reporter: Igor Soarez
>Priority: Minor
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16077/4/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 5000. 
> [listener.name.internal.ssl.keystore.location] are not updated ==> expected: 
>  but was: 
>     at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>     at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>     at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>     at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>     at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>     at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>     at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>     at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)
>     at 
> kafka.admin.ConfigCommandIntegrationTest.verifyConfigDefaultValue(ConfigCommandIntegrationTest.java:519)
>     at 
> kafka.admin.ConfigCommandIntegrationTest.deleteAndVerifyConfig(ConfigCommandIntegrationTest.java:514)
>     at 
> kafka.admin.ConfigCommandIntegrationTest.testDynamicBrokerConfigUpdateUsingKraft(ConfigCommandIntegrationTest.java:237)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]

2024-06-12 Thread via GitHub


frankvicky commented on PR #16319:
URL: https://github.com/apache/kafka/pull/16319#issuecomment-2164259901

   Hi @zhaochun-ma, @showuon 
   I have fix the bug that regex can't correctly parse that connection string 
starting with `SASL_PLAINTEXT`. PTAL  
   Special thanks to @zhaochun-ma finding this issue.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]

2024-06-12 Thread via GitHub


frankvicky opened a new pull request, #16319:
URL: https://github.com/apache/kafka/pull/16319

   In previous PR(#16048), I mistakenly excluded the underscore (_) from the 
set of valid characters for the protocol, resulting in the inability to 
correctly parse the connection string for SASL_PLAINTEXT. This bug fix 
addresses the issue and includes corresponding tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16913) Support external schemas in JSONConverter

2024-06-12 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854601#comment-17854601
 ] 

Ganesh Sadanala commented on KAFKA-16913:
-

is there any documentation to be updated with this change? or is that another 
task?

> Support external schemas in JSONConverter
> -
>
> Key: KAFKA-16913
> URL: https://issues.apache.org/jira/browse/KAFKA-16913
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Priyanka K U
>Assignee: Ganesh Sadanala
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
>  KIP-1054: Support external schemas in JSONConverter : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16913) Support external schemas in JSONConverter

2024-06-12 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854600#comment-17854600
 ] 

Ganesh Sadanala commented on KAFKA-16913:
-

PR is ready to review

> Support external schemas in JSONConverter
> -
>
> Key: KAFKA-16913
> URL: https://issues.apache.org/jira/browse/KAFKA-16913
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Priyanka K U
>Assignee: Ganesh Sadanala
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
>  KIP-1054: Support external schemas in JSONConverter : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-12 Thread via GitHub


kirktrue commented on PR #16310:
URL: https://github.com/apache/kafka/pull/16310#issuecomment-2164238647

   @AndrewJSchofield @cadonna @lianetm @philipnee—please review this PR. It's 
an alternative take on #16241 that seems simpler and not fraught with peril.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-12 Thread via GitHub


kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1637405808


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1983,6 +2009,10 @@ SubscriptionState subscriptions() {
 return subscriptions;
 }
 
+FetchCommittedOffsetsEvent pendingOffsetFetch() {

Review Comment:
   That made sense, so I changed it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyTaskAssignor [kafka]

2024-06-12 Thread via GitHub


apourchet opened a new pull request, #16318:
URL: https://github.com/apache/kafka/pull/16318

   Since the new public API for TaskAssignor shared a name, this rename will 
prevent users from confusing the internal definition with the public one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `raft` module [kafka]

2024-06-12 Thread via GitHub


gongxuanzhang commented on PR #16278:
URL: https://github.com/apache/kafka/pull/16278#issuecomment-2164215470

   > @gongxuanzhang you need to fix it again :)
   
   Ok,I'll fix it periodically before merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16689: Move LogValidatorTest to storage module [kafka]

2024-06-12 Thread via GitHub


chia7712 commented on code in PR #16167:
URL: https://github.com/apache/kafka/pull/16167#discussion_r1637373367


##
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##
@@ -0,0 +1,2100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.nio.ByteBuffer;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import kafka.server.BrokerTopicStats;
+import kafka.server.RequestLocal;
+import kafka.log.UnifiedLog;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import com.yammer.metrics.core.MetricName;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+Time time = Time.SYSTEM;

Review Comment:
   please add `private final` modifiers



##
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##
@@ -0,0 +1,2100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.IOException;

Re: [PR] [MINOR]:Update visibility from public to protected and adjust the order in BuiltInPartitioner [kafka]

2024-06-12 Thread via GitHub


gongxuanzhang commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1637379530


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -1033,10 +1033,18 @@ public Deque getDeque(TopicPartition tp) 
{
  * Get the deque for the given topic-partition, creating it if necessary.
  */
 private Deque getOrCreateDeque(TopicPartition tp) {
-TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> 
new TopicInfo(logContext, k, batchSize));
+TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(),
+k -> new TopicInfo(createBuiltInPartitioner(logContext, k, 
batchSize)));
 return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new 
ArrayDeque<>());
 }
 
+/**
+ * Subclass can custom {@link BuiltInPartitioner}

Review Comment:
   You are right,it seem to don’t need explaining. I removed docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854595#comment-17854595
 ] 

Luke Chen commented on KAFKA-16946:
---

[~frankvicky] , this is a blocker for v3.8.0. Hope we can fix it soon. Thanks.

> Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
> -
>
> Key: KAFKA-16946
> URL: https://issues.apache.org/jira/browse/KAFKA-16946
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Luke Chen
>Assignee: TengYao Chi
>Priority: Blocker
> Fix For: 3.8.0
>
>
> In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but 
> it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16946:
--
Fix Version/s: 3.8.0

> Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
> -
>
> Key: KAFKA-16946
> URL: https://issues.apache.org/jira/browse/KAFKA-16946
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Luke Chen
>Assignee: TengYao Chi
>Priority: Blocker
> Fix For: 3.8.0
>
>
> In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but 
> it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16939: Revisit ConfigCommandIntegrationTest [kafka]

2024-06-12 Thread via GitHub


m1a2st opened a new pull request, #16317:
URL: https://github.com/apache/kafka/pull/16317

   https://issues.apache.org/jira/browse/KAFKA-16939 
   
   some secnario is wrong, so fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16942 Use ConcurrentHashMap in RecordAccumulator#nodeStats [kafka]

2024-06-12 Thread via GitHub


gongxuanzhang commented on PR #16305:
URL: https://github.com/apache/kafka/pull/16305#issuecomment-2164206281

   I agree, with the development of Java, the performance of ConcurrentHashMap 
has indeed significantly improved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-12 Thread via GitHub


mjsax commented on code in PR #16316:
URL: https://github.com/apache/kafka/pull/16316#discussion_r1637372406


##
docs/streams/upgrade-guide.html:
##
@@ -303,6 +303,62 @@ Streams API
   adds a new config default.client.supplier that allows to 
use a custom KafkaClientSupplier without any code changes.
 
 
+Streams API changes in 3.3.0
+
+  Kafka Streams does not send a "leave group" request when an instance is 
closed. This behavior implies
+  that a rebalance is delayed until max.poll.interval.ms passed.
+  https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group;>KIP-812
+  introduces KafkaStreams.close(CloseOptions) overload which 
allows to force an instance to leave the
+  group immediately.
+
+  Note: due to internal limitations, CloseOptions only works 
for static consumer groups at this point
+  (cf. https://issues.apache.org/jira/browse/KAFKA-16514;>KAFKA-16514 for 
more details and a fix in
+  some future release).
+
+
+
+  https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API;>KIP-820
+  adapts the PAPI type-safety improvement of KIP-478 into the DSL. The 
existing methods KStream.transform,
+  KStream.flatTransform, 
KStream.transformValues, and 
KStream.flatTransformValues
+  as well as all overloads of void KStream.process are 
deprecated in favor of the newly added methods
+  
+KStreamKOut,VOut KStream.process(ProcessorSupplier, 
...)
+KStreamK,VOut 
KStream.processValues(FixedKeyProcessorSupplier, ...)
+  
+  Both new methods have multiple overlaods and return a 
KStream instead of void as the
+  deprecated process() methods did. In addition, 
FixedKeyProcessor, FixedKeyRecord,
+  FixedKeyProcessorContext, and 
ContextualFixedKeyProcessor are introduced to guard against
+  disallowed key modification inside processValues(). 
Furthermore ProcessingContext is
+  added for a better interface hierarchy.
+
+
+
+  Emitting a windowed aggregation result only after a window is closed is 
currently supported via the
+  suppress() operator. However, suppress() uses 
an in-memory implementation and does not
+  support RocksDB. To close this gap,
+  https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced;>KIP-825
+  introduces "emit strategies" which are built into the aggregation 
operator directly to use the already existing
+  RocksDB store. 
TimeWindowedKStream.emitStrategy(EmitStrategy) and
+  SessionWindowedKStream.emitStrategy(EmitStrategy) allow to 
pick between "emit on window update" (default)
+  and "emit on window close" strategies. Additionally, a few new emit 
metrics are added, as well as a necessary
+  new method SessionStore.findSessions(long, long).
+
+
+
+  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832;>KIP-834
 allows to pause
+  and resume a Kafka Streams instance. Pausing implies that processing 
input records and executing punctuations will
+  be skipped; Kafka Streams will continue to poll to maintain its group 
membership and may commit offsets.
+  In addition to the new method KafkaStreams.pause() and 
KafkaStreams.resume(), it is also
+  supported to check if an instance is paused via 
KafkaStreams.isPaused() method.
+
+
+
+  To improve monitoring of Kafka Streams applications, https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093;>KIP-846
+  adds four new metrics bytes-consumed-total, 
records-consumed-total,
+  bytes-produced-total, and 
records-produced-total within a new topic level scope.

Review Comment:
   These metrics are already added to the `ops.html` KS metrics section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]

2024-06-12 Thread via GitHub


mjsax opened a new pull request, #16316:
URL: https://github.com/apache/kafka/pull/16316

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]

2024-06-12 Thread via GitHub


chia7712 commented on PR #16231:
URL: https://github.com/apache/kafka/pull/16231#issuecomment-2164203749

   @brandboat could you please remove `Microbenchmarks` from suppression file?
   
   
https://github.com/apache/kafka/blob/6d1f8f8727e8f9c7071b83b7dd9feb840d8d661f/checkstyle/suppressions.xml#L61


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-12 Thread via GitHub


philipnee commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1637370335


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1983,6 +2009,10 @@ SubscriptionState subscriptions() {
 return subscriptions;
 }
 
+FetchCommittedOffsetsEvent pendingOffsetFetch() {

Review Comment:
   would `boolean hasPendingOffsetFetch()` be sufficient for the test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16938) non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig

2024-06-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16938.

Resolution: Fixed

> non-dynamic props gets corrupted due to circular reference between 
> DynamicBrokerConfig and DynamicConfig
> 
>
> Key: KAFKA-16938
> URL: https://issues.apache.org/jira/browse/KAFKA-16938
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Blocker
> Fix For: 3.9.0
>
>
> DynamicBrokerConfig has circular reference with DynamicConfig. The following 
> initialization order will cause incorrect non-dynamic props [0]
> 1. DynamicConfig is initializing -> brokerConfigs is created [1]
> 2. DynamicConfig is initializing -> call 
> DynamicBrokerConfig.addDynamicConfigs(brokerConfigs) [2]
> 3. DynamicBrokerConfig is initializing -> nonDynamicProps: Set[String] = 
> KafkaConfig.configNames.toSet – DynamicConfig.Broker.names.asScala [3]
> 4. DynamicConfig.Broker.names reference `brokerConfigs`, and `brokerConfigs` 
> does not have all dynamic props (step2), so nonDynamicProps get created with 
> incorrect contents.
> We should break the circular by addressing following tasks:
> 1. move `DynamicBrokerConfig.addDynamicConfigs` to `DynamicConfig.Broker`
> 2. move `DynamicBrokerConfig#nonDynamicProps` to `DynamicConfig.Broker`
> {code:scala}
> object DynamicConfig {
>   object Broker {
> private val brokerConfigs = {
>   val configs = QuotaConfigs.brokerQuotaConfigs()
>   KafkaConfig.configKeys.filter {
> case (configName, _) => AllDynamicConfigs.contains(configName)
>   }.foreach {
> case (_, config) =>
>   configs.define(config.name, config.`type`, config.defaultValue, 
> config.validator,
> config.importance, config.documentation, config.group, 
> config.orderInGroup, config.width,
> config.displayName, config.dependents, config.recommender)
>   }
>   configs
> }
> val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- 
> brokerConfigs.names.asScala
> {code}
> [0] 
> [https://github.com/apache/kafka/blob/638844f833b165d6f9ca52c173858d26b7254fac/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L120]
> [1] 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicConfig.scala#L35]
> [2] 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicConfig.scala#L36]
> [3] 
> [https://github.com/apache/kafka/blob/638844f833b165d6f9ca52c173858d26b7254fac/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L120]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16944) Range assignor doesn't co-partition with stickiness

2024-06-12 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854593#comment-17854593
 ] 

dujian0068 commented on KAFKA-16944:


Hello, does this problem need to be fixed quickly? If not, can it be assigned 
to me?
It is not easy for me to find a problem, and it may take me some time to deal 
with it

> Range assignor doesn't co-partition with stickiness
> ---
>
> Key: KAFKA-16944
> URL: https://issues.apache.org/jira/browse/KAFKA-16944
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Ritika Reddy
>Priority: Major
>
> When stickiness is considered during range assignments, it is possible that 
> in certain cases where co-partitioning is guaranteed we fail. 
> An example would be:
> Consider two topics T1, T2 with 3 partitions each and three members A, B, C.
> Let's say the existing assignment (for whatever reason) is:
> {quote}A -> T1P0  ||  B -> T1P1, T2P0, T2P1, T2P2 || C -> T1P2
> {quote}
> Now we trigger a rebalance with the following subscriptions where all members 
> are subscribed to both topics everything else is the same
> {quote}A -> T1, T2 || B -> T1, T2 || C -> T1, T2
> {quote}
> Since all the topics have an equal number of partitions and all the members 
> are subscribed to the same set of topics we would expect co-partitioning 
> right so would we want the final assignment returned to be
> {quote}A -> T1P0, T2P0  ||  B -> T1P1, T2P1 || C -> T1P2, T2P2
> {quote}
> SO currently the client side assignor returns the following but it's because 
> they don't  assign sticky partitions
> {{{}C=[topic1-2, topic2-2], B=[topic1-1, topic2-1], A=[topic1-0, 
> topic2-0]{}}}Our
>  
> Server side assignor returns:
> (The partitions in bold are the sticky partitions)
> {{{}A=MemberAssignment(targetPartitions={topic2=[1], 
> }}\{{{}{*}topic1=[0]{*}{}}}{{{}}), 
> B=MemberAssignment(targetPartitions={{}}}{{{}*topic2=[0]*{}}}{{{}, 
> {{{{{}*topic1=[1]*{}}}{{{}}), 
> C=MemberAssignment(targetPartitions={topic2=[2], {{{{{}*topic1=[2]*{}}}
> *As seen above co-partitioning is expected but not returned.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16938: non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig. [kafka]

2024-06-12 Thread via GitHub


chia7712 merged PR #16302:
URL: https://github.com/apache/kafka/pull/16302


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854591#comment-17854591
 ] 

Luke Chen commented on KAFKA-16946:
---

Go ahead to fix it, [~frankvicky] !

> Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
> -
>
> Key: KAFKA-16946
> URL: https://issues.apache.org/jira/browse/KAFKA-16946
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but 
> it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-16946:
-

Assignee: TengYao Chi

> Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
> -
>
> Key: KAFKA-16946
> URL: https://issues.apache.org/jira/browse/KAFKA-16946
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Luke Chen
>Assignee: TengYao Chi
>Priority: Blocker
>
> In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but 
> it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Clean up for KafkaAdminClientTest [kafka]

2024-06-12 Thread via GitHub


chia7712 merged PR #16285:
URL: https://github.com/apache/kafka/pull/16285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]

2024-06-12 Thread via GitHub


cmccabe commented on PR #16048:
URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164187718

   > @frankvicky looks like with this change, SASL_PLAINTEXT://localhost:50132 
becomes invalid url because of _ in the protocol, and it's a breaking change, 
is this expected?
   
   definitely a bug. we should fix or revert


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16689: Move LogValidatorTest to storage module [kafka]

2024-06-12 Thread via GitHub


TaiJuWu commented on PR #16167:
URL: https://github.com/apache/kafka/pull/16167#issuecomment-2164170657

   @chia7712 Please take look! Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-12 Thread via GitHub


ableegoldman commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1637334663


##
docs/streams/developer-guide/config-streams.html:
##
@@ -441,8 +441,8 @@ num.standby.replicastask.assignor.class
 Medium
-A task assignor class or class name implementing 
the org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface.
-The default high-availability task assignor.
+A task assignor class or class name implementing 
the TaskAssignor interface.

Review Comment:
   alternatively we might just want to rename the old interface to 
`LegacyTaskAssignor` or something like that. See my message on CC slack cc 
@mjsax 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-12 Thread via GitHub


kirktrue commented on PR #16241:
URL: https://github.com/apache/kafka/pull/16241#issuecomment-2164161606

   @lianetm @cadonna @AndrewJSchofield—PTAL at an alternate implementation of 
this Jira: #16310. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13560:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Load indexes and data in async manner in the critical path of replica fetcher 
> threads. 
> ---
>
> Key: KAFKA-13560
> URL: https://issues.apache.org/jira/browse/KAFKA-13560
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.9.0
>
>
> https://github.com/apache/kafka/pull/11390#discussion_r762366976
> https://github.com/apache/kafka/pull/11390#discussion_r1033141283
> https://github.com/apache/kafka/pull/15690 removed the below method from in 
> `TierStateMachine` interface. This can be added back when we implement the 
> functionality required to address this issue. 
> {code:java}
> public Optional maybeAdvanceState(TopicPartition 
> topicPartition, PartitionFetchState currentFetchState)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15300) Include remotelog size in complete log size and also add local log size and remote log size separately in kafka-log-dirs tool.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15300:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Include remotelog size in complete log size and also add local log size and 
> remote log size separately in kafka-log-dirs tool. 
> ---
>
> Key: KAFKA-15300
> URL: https://issues.apache.org/jira/browse/KAFKA-15300
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.9.0
>
>
> Include remotelog size in complete log size and also add local log size and 
> remote log size separately in kafka-log-dirs tool. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15480) Add RemoteStorageInterruptedException

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15480:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add RemoteStorageInterruptedException
> -
>
> Key: KAFKA-15480
> URL: https://issues.apache.org/jira/browse/KAFKA-15480
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Mital Awachat
>Priority: Major
>  Labels: kip
> Fix For: 3.9.0
>
>
> Introduce `RemoteStorageInterruptedException` to propagate interruptions from 
> the plugin to Kafka without generated (false) errors. 
> It allows the plugin to notify Kafka an API operation in progress was 
> interrupted as a result of task cancellation, which can happen under changes 
> such as leadership migration or topic deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15214:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Task
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.9.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15529) Flaky test ReassignReplicaShrinkTest.executeTieredStorageTest

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15529:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Flaky test ReassignReplicaShrinkTest.executeTieredStorageTest
> -
>
> Key: KAFKA-15529
> URL: https://issues.apache.org/jira/browse/KAFKA-15529
> Project: Kafka
>  Issue Type: Test
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.9.0
>
>
> Example of failed CI build - 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14449/3/testReport/junit/org.apache.kafka.tiered.storage.integration/ReassignReplicaShrinkTest/Build___JDK_21_and_Scala_2_13___executeTieredStorageTest_String__quorum_kraft_2/]
>   
> {noformat}
> org.opentest4j.AssertionFailedError: Number of fetch requests from broker 0 
> to the tier storage does not match the expected value for topic-partition 
> topicA-1 ==> expected: <3> but was: <4>
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
>   at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:559)
>   at 
> app//org.apache.kafka.tiered.storage.actions.ConsumeAction.doExecute(ConsumeAction.java:128)
>   at 
> app//org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25)
>   at 
> app//org.apache.kafka.tiered.storage.TieredStorageTestHarness.executeTieredStorageTest(TieredStorageTestHarness.java:112){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15038:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
> Fix For: 3.9.0
>
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854582#comment-17854582
 ] 

TengYao Chi commented on KAFKA-16946:
-

Hi [~showuon] , if the issue finder is not willing to do, I would like to fix 
this

> Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
> -
>
> Key: KAFKA-16946
> URL: https://issues.apache.org/jira/browse/KAFKA-16946
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but 
> it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9578) Kafka Tiered Storage - System Tests

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9578:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Kafka Tiered Storage - System  Tests
> 
>
> Key: KAFKA-9578
> URL: https://issues.apache.org/jira/browse/KAFKA-9578
> Project: Kafka
>  Issue Type: Test
>Reporter: Harsha
>Priority: Major
> Fix For: 3.9.0
>
>
> Initial test cases set up by [~Ying Zheng] 
>  
> [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15132:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Implement disable & re-enablement for Tiered Storage
> 
>
> Key: KAFKA-15132
> URL: https://issues.apache.org/jira/browse/KAFKA-15132
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: kip
> Fix For: 3.9.0
>
>
> KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the 
> limitations mentioned in the KIP is inability to re-enable TS on a topic 
> after it has been disabled.
> {quote}Once tier storage is enabled for a topic, it can not be disabled. We 
> will add this feature in future versions. One possible workaround is to 
> create a new topic and copy the data from the desired offset and delete the 
> old topic. 
> {quote}
> This task will propose a new KIP which extends on KIP-405 to describe the 
> behaviour on on disablement and re-enablement of tiering storage for a topic. 
> The solution will apply for both Zk and KRaft variants.
> [1] KIP-405 - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15376:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Explore options of removing data earlier to the current leader's leader epoch 
> lineage for topics enabled with tiered storage.
> -
>
> Key: KAFKA-15376
> URL: https://issues.apache.org/jira/browse/KAFKA-15376
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.9.0
>
>
> Followup on the discussion thread:
> [https://github.com/apache/kafka/pull/13561#discussion_r1288778006]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15864:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add more tests asserting the log-start-offset, local-log-start-offset, and 
> HW/LSO/LEO in rolling over segments with tiered storage.
> ---
>
> Key: KAFKA-15864
> URL: https://issues.apache.org/jira/browse/KAFKA-15864
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.9.0
>
>
> Followup on the 
> [comment|https://github.com/apache/kafka/pull/14766/files#r1395389551]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15094) Add RemoteIndexCache metrics like misses/evictions/load-failures.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15094:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add RemoteIndexCache metrics like misses/evictions/load-failures.
> -
>
> Key: KAFKA-15094
> URL: https://issues.apache.org/jira/browse/KAFKA-15094
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
> Fix For: 3.9.0
>
>
> Add metrics like hits/misses/evictions/load-failures for RemoteIndexCache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14915:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Option to consume multiple partitions that have their data in remote storage 
> for the target offsets.
> 
>
> Key: KAFKA-14915
> URL: https://issues.apache.org/jira/browse/KAFKA-14915
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.9.0
>
>
> Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15195:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Regenerate segment-aligned producer snapshots when upgrading to a Kafka 
> version supporting Tiered Storage
> -
>
> Key: KAFKA-15195
> URL: https://issues.apache.org/jira/browse/KAFKA-15195
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
> Fix For: 3.9.0
>
>
> As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to 
> upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will 
> have to wait for retention to clean up segments without an associated 
> producer snapshot.
> However, in our experience, customers of Kafka expect to be able to 
> immediately enable tiering on a topic once their cluster upgrade is complete. 
> Once they do this, however, they start seeing NPEs and no data is uploaded to 
> Tiered Storage 
> (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).
> To achieve this, we propose changing Kafka to retroactively create producer 
> snapshot files on upload whenever a segment is due to be archived and lacks 
> one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15301) [Tiered Storage] Historically compacted topics send request to remote for active segment during consume

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15301:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> [Tiered Storage] Historically compacted topics send request to remote for 
> active segment during consume
> ---
>
> Key: KAFKA-15301
> URL: https://issues.apache.org/jira/browse/KAFKA-15301
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Mital Awachat
>Assignee: Jimmy Wang
>Priority: Major
> Fix For: 3.9.0
>
>
> I have a use case where tiered storage plugin received requests for active 
> segments. The topics for which it happened were historically compacted topics 
> for which compaction was disabled and tiering was enabled.
> Create topic with compact cleanup policy -> Produce data with few repeat keys 
> and create multiple segments -> let compaction happen -> change cleanup 
> policy to delete -> produce some more data for segment rollover -> enable 
> tiering on topic -> wait for segments to be uploaded to remote storage and 
> cleanup from local (active segment would remain), consume from beginning -> 
> Observe logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15341:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.9.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13355) Shutdown broker eventually when unrecoverable exceptions like IOException encountered in RLMM.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13355:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Shutdown broker eventually when unrecoverable exceptions like IOException 
> encountered in RLMM. 
> ---
>
> Key: KAFKA-13355
> URL: https://issues.apache.org/jira/browse/KAFKA-13355
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.9.0
>
>
> Have mechanism to catch unrecoverable exceptions like IOException from RLMM 
> and shutdown the broker like it is done in log layer. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15969) Align RemoteStorageThreadPool metrics name with KIP-405

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15969:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Align RemoteStorageThreadPool metrics name with KIP-405
> ---
>
> Key: KAFKA-15969
> URL: https://issues.apache.org/jira/browse/KAFKA-15969
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
> Fix For: 3.9.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> In KIP-405, there are 2 metrics defined below:
> ^kafka.log.remote:type=RemoteStorageThreadPool, 
> name=RemoteLogReaderTaskQueueSize^
> and
> ^kafka.log.remote:type=RemoteStorageThreadPool, 
> name=RemoteLogReaderAvgIdlePercent^
> However, in Kafka 3.6 release, the actual metrics exposes are:
> ^org.apache.kafka.storage.internals.log:name=RemoteLogReaderAvgIdlePercent,type=RemoteStorageThreadPool^
> ^org.apache.kafka.storage.internals.log:name=RemoteLogReaderTaskQueueSize,type=RemoteStorageThreadPool^
> The problem is the bean domain name is changed from ^{{kafka.log.remote}}^ to 
> {{{}^org.apache.kafka.storage.internals.log^{}}}. And the type name is also 
> changed.
> We should either update the metrics path in KIP, or fix the path in the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15331) Handle remote log enabled topic deletion when leader is not available

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15331:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Handle remote log enabled topic deletion when leader is not available
> -
>
> Key: KAFKA-15331
> URL: https://issues.apache.org/jira/browse/KAFKA-15331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kamal Chandraprakash
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.9.0
>
>
> When a topic gets deleted, then there can be a case where all the replicas 
> can be out of ISR. This case is not handled, See: 
> [https://github.com/apache/kafka/pull/13947#discussion_r1289331347] for more 
> details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15388:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
> Fix For: 3.9.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-12 Thread via GitHub


frankvicky commented on code in PR #16250:
URL: https://github.com/apache/kafka/pull/16250#discussion_r1637320049


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1988,6 +1989,15 @@ public void testDescribeClientMetricsConfigs() throws 
Exception {
 }
 }
 
+@Test
+public void testCreateDescribeConfigsByDuplicateResources() {
+ConfigResource resource = new 
ConfigResource(ConfigResource.Type.BROKER, "1");
+ConfigResource duplicateResource = new 
ConfigResource(ConfigResource.Type.BROKER, "1");
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+assertDoesNotThrow(() -> 
env.adminClient().describeConfigs(asList(resource, duplicateResource)));

Review Comment:
   Cool, I will do that 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]

2024-06-12 Thread via GitHub


frankvicky commented on PR #16048:
URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164151022

   Hi @zhaochun-ma 
   Thanks for your remind 
   If you have free cycle, it would be nice you could have a PR to fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]

2024-06-12 Thread via GitHub


zhaochun-ma commented on PR #16048:
URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164138119

   > @zhaochun-ma , nice find! I've created 
[KAFKA-16946](https://issues.apache.org/jira/browse/KAFKA-16946) for this 
issue. Are you interested in opening a PR to fix it?
   
   i haven't worked on kafka before, might be better for someone else to pick 
it up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]

2024-06-12 Thread via GitHub


xiaoqingwanga commented on PR #16303:
URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164138080

   @gharris1727 Thanks for the tip! I'll look into it and tweak the code a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2024-06-12 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854579#comment-17854579
 ] 

Satish Duggana commented on KAFKA-14877:


InMemoryLeaderEpochCheckpoint seems to be deleted with other refactoring.

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Minor
> Fix For: 3.8.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-14877.

Resolution: Invalid

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Minor
> Fix For: 3.8.0
>
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16935.
---
Resolution: Fixed

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-16935:
---

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16935:
--
Fix Version/s: 3.8.0

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests [kafka]

2024-06-12 Thread via GitHub


C0urante merged PR #16288:
URL: https://github.com/apache/kafka/pull/16288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16947) Kafka Tiered Storage V2

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-16947:
---
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Kafka Tiered Storage V2
> ---
>
> Key: KAFKA-16947
> URL: https://issues.apache.org/jira/browse/KAFKA-16947
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13561) Consider deprecating `StreamsBuilder#build(props)` function

2024-06-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13561.
-
Resolution: Duplicate

> Consider deprecating `StreamsBuilder#build(props)` function
> ---
>
> Key: KAFKA-13561
> URL: https://issues.apache.org/jira/browse/KAFKA-13561
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> With 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
>  being accepted that introduced the new `StreamsBuilder(TopologyConfig)` 
> constructor, we can consider deprecating the `StreamsBuilder#build(props)` 
> function now. There are still a few things we'd need to do:
> 1. Copy the `StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG` to TopologyConfig.
> 2. Make sure the overloaded `StreamsBuilder()` constructor takes in default 
> values of TopologyConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16947) Kafka Tiered Storage V2

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-16947:
---
Affects Version/s: (was: 3.6.0)

> Kafka Tiered Storage V2
> ---
>
> Key: KAFKA-16947
> URL: https://issues.apache.org/jira/browse/KAFKA-16947
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16947) Kafka Tiered Storage V2

2024-06-12 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-16947:
--

 Summary: Kafka Tiered Storage V2
 Key: KAFKA-16947
 URL: https://issues.apache.org/jira/browse/KAFKA-16947
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.6.0
Reporter: Satish Duggana
Assignee: Satish Duggana
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16945) Cleanup StreamsBuilder and TopologyConfig

2024-06-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854578#comment-17854578
 ] 

Matthias J. Sax commented on KAFKA-16945:
-

I did not look... :P

Found https://issues.apache.org/jira/browse/KAFKA-13561 but it only mentions 
`builder(Properties)` but nothing about `TopologyConfig` so I'll go ahead an 
close as this ticket is more generic.

> Cleanup StreamsBuilder and TopologyConfig
> -
>
> Key: KAFKA-16945
> URL: https://issues.apache.org/jira/browse/KAFKA-16945
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Historically, Kafka Streams offers two ways to build a topology: either via 
> the PAPI by creating a `new Topology()` explicitly, or via the 
> `StreamsBuilder` which returns a topology via `build()` method.
> We later added an overload `build(Properties)` to enable topology 
> optimizations for the DSL layer.
> Furthermore, we also added `TopologyConfig` object, which can be passed into 
> `new Topology(TopologyConfig)` as well as `StreamsBuilder(TopologyConfig)`.
> We should consider to unify the different approaches to simplify the rather 
> complex API we have right now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16890) Failing to build aux state on broker failover

2024-06-12 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-16890.

Resolution: Fixed

> Failing to build aux state on broker failover
> -
>
> Key: KAFKA-16890
> URL: https://issues.apache.org/jira/browse/KAFKA-16890
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0, 3.7.1
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.8.0
>
>
> We have clusters where we replace machines often falling into a state where 
> we keep having "Error building remote log auxiliary state for 
> loadtest_topic-22" and the partition being under-replicated until the leader 
> is manually restarted. 
> Looking into a specific case, here is what we observed in 
> __remote_log_metadata topic:
> {code:java}
>  
> partition: 29, offset: 183593, value: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=GZeRTXLMSNe2BQjRXkg6hQ}, startOffset=10823, endOffset=11536, 
> brokerId=10013, maxTimestampMs=1715774588597, eventTimestampMs=1715781657604, 
> segmentLeaderEpochs={125=10823, 126=10968, 128=11047, 130=11048, 131=11324, 
> 133=11442, 134=11443, 135=11445, 136=11521, 137=11533, 139=11535}, 
> segmentSizeInBytes=704895, customMetadata=Optional.empty, 
> state=COPY_SEGMENT_STARTED}
> partition: 29, offset: 183594, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, 
> state=COPY_SEGMENT_FINISHED, eventTimestampMs=1715781658183, brokerId=10013}
> partition: 29, offset: 183669, value: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=L1TYzx0lQkagRIF86Kp0QQ}, startOffset=10823, endOffset=11544, 
> brokerId=10008, maxTimestampMs=1715781445270, eventTimestampMs=1715782717593, 
> segmentLeaderEpochs={125=10823, 126=10968, 128=11047, 130=11048, 131=11324, 
> 133=11442, 134=11443, 135=11445, 136=11521, 137=11533, 139=11535, 140=11537, 
> 142=11543}, segmentSizeInBytes=713088, customMetadata=Optional.empty, 
> state=COPY_SEGMENT_STARTED}
> partition: 29, offset: 183670, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, 
> state=COPY_SEGMENT_FINISHED, eventTimestampMs=1715782718370, brokerId=10008}
> partition: 29, offset: 186215, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, 
> state=DELETE_SEGMENT_STARTED, eventTimestampMs=1715867874617, brokerId=10008}
> partition: 29, offset: 186216, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, 
> state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1715867874725, brokerId=10008}
> partition: 29, offset: 186217, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, 
> state=DELETE_SEGMENT_STARTED, eventTimestampMs=1715867874729, brokerId=10008}
> partition: 29, offset: 186218, value: 
> RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22,
>  id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, 
> state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1715867874817, brokerId=10008}
> {code}
>  
> It seems that at the time the leader is restarted (10013), a second copy of 
> the same segment is tiered by the new leader (10008). Interestingly the 
> segment doesn't have the same end offset, which is concerning. 
> Then the follower sees the following error repeatedly until the leader is 
> restarted: 
>  
> {code:java}
> [2024-05-17 20:46:42,133] DEBUG [ReplicaFetcher replicaId=10013, 
> leaderId=10008, fetcherId=0] Handling errors in processFetchRequest for 
> partitions HashSet(loadtest_topic-22) (kafka.server.ReplicaFetcherThread)
> [2024-05-17 20:46:43,174] DEBUG [ReplicaFetcher replicaId=10013, 
> leaderId=10008, fetcherId=0] Received error OFFSET_MOVED_TO_TIERED_STORAGE, 
> at fetch offset: 11537, topic-partition: loadtest_topic-22 
> (kafka.server.ReplicaFetcherThread)
> [2024-05-17 20:46:43,175] ERROR [ReplicaFetcher replicaId=10013, 
> leaderId=10008, fetcherId=0] Error 

[jira] [Commented] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854577#comment-17854577
 ] 

Matthias J. Sax commented on KAFKA-16925:
-

{quote}My understanding is that the processor here wants to know the maximum 
observed stream time so far (including the current record), and 
context.currentStreamTimeMs() is set from the timestamp of input records. For 
me that's the information this processor is looking for ?
{quote}
Assume there is an upstream processor with a state store, and the upstream 
processor puts input record into the state store, but does yet forward them 
(could be more than one record). Thus, task stream-time would account for the 
records buffered in the state store. Assume that this advanced stream-time 
might close some downstream window in the join. Later, the upstream processor 
takes out the records from the store and finally forwards them into the join – 
the join might now incorrectly drop the record if the window got already closed 
(via a rebalance and re-init of stream-time would would jump forward now) if it 
would use the task tracked stream-time, even if there is no actual reason to 
drop the records. Thus, the task's tracked stream-time should not be used IMHO 
to init the operator's stream-time.

Putting the information into the store is usually tricky. – We need to find a 
key for the stream time which is guaranteed to be unique and does not conflict 
with any other data key from the actually records we process... It would become 
a very operator (or store) specific solution, which seems to be not generic 
(especially for PAPI usage it would be nice to have something generic inside 
the KS runtime that is independent of the use processors and/or state stores).
{quote}I am not talking about general case, where I agree that it is better to 
put the logic in the store itself or any other way that is reusable in 
processors.
{quote}
Not sure if we would need to put it into the store – IMHO, we could also track 
stream-time inside the KS runtime on a per-operator basis, but as it might 
result in some overhead, we might want to make it opt-in and disable by 
default. The tracked time is stores via commit offset metadata, snd this 
metadata size can become an issue if too large (thus we should try to keep it 
as small as possible). – What actually raises a more general question anyway: 
this metadata did grow over time, and we did consider to maybe even add an 
in-memory metadata store (backed by a topic) to get rid of the commit metadata 
overhead...

I would really like to get to an more holistic solution (we did bolt on too 
many island solutions over the years, what it totally ok for some time, but I 
believe we are reaching a state where it's worth to build a generic good 
solution, and stop adding more island solutions...)

> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined 

Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]

2024-06-12 Thread via GitHub


showuon commented on PR #16048:
URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164106388

   @zhaochun-ma , nice find! I've created 
[KAFKA-16946](https://issues.apache.org/jira/browse/KAFKA-16946) for this 
issue. Are you interested in opening a PR to fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port

2024-06-12 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16946:
-

 Summary: Utils.getHost/getPort cannot parse 
SASL_PLAINTEXT://host:port
 Key: KAFKA-16946
 URL: https://issues.apache.org/jira/browse/KAFKA-16946
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.8.0
Reporter: Luke Chen


In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but it 
failed to parse SASL_PLAINTEXT://host:port now. Need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15045: (KIP-924 pt. 25) HA Task Assignor implemented [kafka]

2024-06-12 Thread via GitHub


apourchet opened a new pull request, #16315:
URL: https://github.com/apache/kafka/pull/16315

   This PR implements the HA task assignor with the new KIP 924 API.
   
   We also add this new assignor to the existing suite of 
`StreamsPartitionAssignorTest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16939) Revisit ConfigCommandIntegrationTest

2024-06-12 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854573#comment-17854573
 ] 

黃竣陽 commented on KAFKA-16939:
-

I will fix it ASAP.

> Revisit ConfigCommandIntegrationTest
> 
>
> Key: KAFKA-16939
> URL: https://issues.apache.org/jira/browse/KAFKA-16939
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> ConfigCommandIntegrationTest has some invalid operations
> 1. the base command should not include the broker id [0]
> 2. don't set the broker id when we are looking for default value [1]
> [0] 
> https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java#L477
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java#L496



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove Java 7 example code [kafka]

2024-06-12 Thread via GitHub


mjsax commented on PR #16308:
URL: https://github.com/apache/kafka/pull/16308#issuecomment-2164092348

   Thanks for the PR @JimGalasyn! Merged to `trunk` and cherry-picked to` 3.8` 
branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove Java 7 example code [kafka]

2024-06-12 Thread via GitHub


mjsax merged PR #16308:
URL: https://github.com/apache/kafka/pull/16308


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]

2024-06-12 Thread via GitHub


satishd commented on PR #16237:
URL: https://github.com/apache/kafka/pull/16237#issuecomment-2164087268

   There were a few unrelated test failures, merging to trunk and 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]

2024-06-12 Thread via GitHub


satishd merged PR #16237:
URL: https://github.com/apache/kafka/pull/16237


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 24) Correct stateful task flagging [kafka]

2024-06-12 Thread via GitHub


apourchet opened a new pull request, #16314:
URL: https://github.com/apache/kafka/pull/16314

   This PR fixes the way we flag tasks as stateful in the KIP 924 codepath to 
mimic the way it is done in the old codepath.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: update Kafka Streams docs with 3.2 KIP information [kafka]

2024-06-12 Thread via GitHub


mjsax opened a new pull request, #16313:
URL: https://github.com/apache/kafka/pull/16313

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-06-12 Thread via GitHub


junrao commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1637247133


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   ELR was originally associated with IBP_3_7_IV3. When it's moved IBP_3_8_IV0, 
we left IBP_3_7_IV3 unused since IBP_3_7_IV4 has already been implemented. We 
didn't reuse IBP_3_7_IV3. Here, there is no 3.8 IV after IBP_3_8_IV0. So, we 
could reuse  IBP_3_8_IV0 for ListOffset. However, it seems that a simpler and 
more consistent policy is to never reuse an IV?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-06-12 Thread via GitHub


jolshan commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1637231197


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   I'm not sure I understand the benefit of leaving it unused. I would suspect 
if there are many ongoing features, we will have a lot of unused versions and 
that is confusing it its own way. I was hoping the details of KIP-1014 would be 
finalized by now so that there was clear guidance for this sort of state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >