Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-15 Thread via GitHub


junrao commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1719022889


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * The SharePartition is used to track the state of a partition that is shared 
between multiple
+ * consumers. The class maintains the state of the records that have been 
fetched from the leader
+ * and are in-flight.
+ */
+public class SharePartition {
+
+private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
+
+/**
+ * empty member id used to indicate when a record is not acquired by any 
member.
+ */
+static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
+/**
+ * The RecordState is used to track the state of a record that has been 
fetched from the leader.
+ * The state of the records determines if the records should be 
re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+// Visible for testing
+enum RecordState {
+AVAILABLE((byte) 0),
+ACQUIRED((byte) 1),
+ACKNOWLEDGED((byte) 2),
+ARCHIVED((byte) 4);
+
+public final byte id;
+
+RecordState(byte id) {
+this.id = id;
+}
+
+/**
+ * Validates that the newState is one of the valid 
transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be 
non-null
+ *
+ * @return {@code RecordState} newState if validation 
succeeds. Returning
+ * newState helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation 
fails.
+ */
+public RecordState validateTransition(RecordState newState) throws 
IllegalStateException {
+Objects.requireNonNull(newState, "newState cannot be null");
+if (this == newState) {
+throw new IllegalStateException("The state transition is 
invalid as the new state is"
++ "the same as the current state");
+}
+
+if (this == ACKNOWLEDGED || this == ARCHIVED) {
+throw new IllegalStateException("The state transition is 
invalid from the current state: " + this);
+}
+
+if (this == AVAILABLE && newState != ACQUIRED) {
+throw new IllegalStateException("The state can only be 
transitioned to ACQUIRED from AVAILABLE");
+}
+
+// Either the transition is from Available -> Acquired or from 
Acquired -> Available/
+// Acknowledged/Archived.
+return newState;
+}
+
+public static RecordState forId(byte id) {
+switch (id) {
+case 0:
+return AVAILABLE;
+case 1:
+return ACQUIRED;
+case 2:
+return ACKNOWLEDGED;
+case 4:
+return ARCHIVED;
+

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-15 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1718871933


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * The SharePartition is used to track the state of a partition that is shared 
between multiple
+ * consumers. The class maintains the state of the records that have been 
fetched from the leader
+ * and are in-flight.
+ */
+public class SharePartition {
+
+private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
+
+/**
+ * empty member id used to indicate when a record is not acquired by any 
member.
+ */
+static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
+/**
+ * The RecordState is used to track the state of a record that has been 
fetched from the leader.
+ * The state of the records determines if the records should be 
re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+// Visible for testing
+enum RecordState {
+AVAILABLE((byte) 0),
+ACQUIRED((byte) 1),
+ACKNOWLEDGED((byte) 2),
+ARCHIVED((byte) 4);
+
+public final byte id;
+
+RecordState(byte id) {
+this.id = id;
+}
+
+/**
+ * Validates that the newState is one of the valid 
transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be 
non-null
+ *
+ * @return {@code RecordState} newState if validation 
succeeds. Returning
+ * newState helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation 
fails.
+ */
+public RecordState validateTransition(RecordState newState) throws 
IllegalStateException {
+Objects.requireNonNull(newState, "newState cannot be null");
+if (this == newState) {
+throw new IllegalStateException("The state transition is 
invalid as the new state is"
++ "the same as the current state");
+}
+
+if (this == ACKNOWLEDGED || this == ARCHIVED) {
+throw new IllegalStateException("The state transition is 
invalid from the current state: " + this);
+}
+
+if (this == AVAILABLE && newState != ACQUIRED) {
+throw new IllegalStateException("The state can only be 
transitioned to ACQUIRED from AVAILABLE");
+}
+
+// Either the transition is from Available -> Acquired or from 
Acquired -> Available/
+// Acknowledged/Archived.
+return newState;
+}
+
+public static RecordState forId(byte id) {
+switch (id) {
+case 0:
+return AVAILABLE;
+case 1:
+return ACQUIRED;
+case 2:
+return ACKNOWLEDGED;
+case 4:
+return ARC

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-15 Thread via GitHub


junrao commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1718846109


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * The SharePartition is used to track the state of a partition that is shared 
between multiple
+ * consumers. The class maintains the state of the records that have been 
fetched from the leader
+ * and are in-flight.
+ */
+public class SharePartition {
+
+private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
+
+/**
+ * empty member id used to indicate when a record is not acquired by any 
member.
+ */
+static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
+/**
+ * The RecordState is used to track the state of a record that has been 
fetched from the leader.
+ * The state of the records determines if the records should be 
re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+// Visible for testing
+enum RecordState {
+AVAILABLE((byte) 0),
+ACQUIRED((byte) 1),
+ACKNOWLEDGED((byte) 2),
+ARCHIVED((byte) 4);
+
+public final byte id;
+
+RecordState(byte id) {
+this.id = id;
+}
+
+/**
+ * Validates that the newState is one of the valid 
transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be 
non-null
+ *
+ * @return {@code RecordState} newState if validation 
succeeds. Returning
+ * newState helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation 
fails.
+ */
+public RecordState validateTransition(RecordState newState) throws 
IllegalStateException {
+Objects.requireNonNull(newState, "newState cannot be null");
+if (this == newState) {
+throw new IllegalStateException("The state transition is 
invalid as the new state is"
++ "the same as the current state");
+}
+
+if (this == ACKNOWLEDGED || this == ARCHIVED) {
+throw new IllegalStateException("The state transition is 
invalid from the current state: " + this);
+}
+
+if (this == AVAILABLE && newState != ACQUIRED) {
+throw new IllegalStateException("The state can only be 
transitioned to ACQUIRED from AVAILABLE");
+}
+
+// Either the transition is from Available -> Acquired or from 
Acquired -> Available/
+// Acknowledged/Archived.
+return newState;
+}
+
+public static RecordState forId(byte id) {
+switch (id) {
+case 0:
+return AVAILABLE;
+case 1:
+return ACQUIRED;
+case 2:
+return ACKNOWLEDGED;
+case 4:
+return ARCHIVED;
+

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-15 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1718405279


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * The SharePartition is used to track the state of a partition that is shared 
between multiple
+ * consumers. The class maintains the state of the records that have been 
fetched from the leader
+ * and are in-flight.
+ */
+public class SharePartition {
+
+private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
+
+/**
+ * empty member id used to indicate when a record is not acquired by any 
member.
+ */
+static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
+/**
+ * The RecordState is used to track the state of a record that has been 
fetched from the leader.
+ * The state of the records determines if the records should be 
re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+// Visible for testing
+enum RecordState {
+AVAILABLE((byte) 0),
+ACQUIRED((byte) 1),
+ACKNOWLEDGED((byte) 2),
+ARCHIVED((byte) 4);
+
+public final byte id;
+
+RecordState(byte id) {
+this.id = id;
+}
+
+/**
+ * Validates that the newState is one of the valid 
transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be 
non-null
+ *
+ * @return {@code RecordState} newState if validation 
succeeds. Returning
+ * newState helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation 
fails.
+ */
+public RecordState validateTransition(RecordState newState) throws 
IllegalStateException {
+Objects.requireNonNull(newState, "newState cannot be null");
+if (this == newState) {
+throw new IllegalStateException("The state transition is 
invalid as the new state is"
++ "the same as the current state");
+}
+
+if (this == ACKNOWLEDGED || this == ARCHIVED) {
+throw new IllegalStateException("The state transition is 
invalid from the current state: " + this);
+}
+
+if (this == AVAILABLE && newState != ACQUIRED) {
+throw new IllegalStateException("The state can only be 
transitioned to ACQUIRED from AVAILABLE");
+}
+
+// Either the transition is from Available -> Acquired or from 
Acquired -> Available/
+// Acknowledged/Archived.
+return newState;
+}
+
+public static RecordState forId(byte id) {
+switch (id) {
+case 0:
+return AVAILABLE;
+case 1:
+return ACQUIRED;
+case 2:
+return ACKNOWLEDGED;
+case 4:
+return ARC

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-15 Thread via GitHub


apoorvmittal10 commented on PR #16274:
URL: https://github.com/apache/kafka/pull/16274#issuecomment-2291269448

   > @apoorvmittal10 : Thanks for the PR. Left a few late review comments.
   
   @junrao Thanks for the review. I have open minor PR to address review 
comments: https://github.com/apache/kafka/pull/16891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-08-14 Thread via GitHub


junrao commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1700968453


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.ReplicaManager;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.ShareAcknowledgementBatch;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.FetchPartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * The SharePartition is used to track the state of a partition that is shared 
between multiple
+ * consumers. The class maintains the state of the records that have been 
fetched from the leader
+ * and are in-flight.
+ */
+public class SharePartition {
+
+private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
+
+/**
+ * empty member id used to indicate when a record is not acquired by any 
member.
+ */
+static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
+/**
+ * The RecordState is used to track the state of a record that has been 
fetched from the leader.
+ * The state of the records determines if the records should be 
re-delivered, move the next fetch
+ * offset, or be state persisted to disk.
+ */
+// Visible for testing
+enum RecordState {
+AVAILABLE((byte) 0),
+ACQUIRED((byte) 1),
+ACKNOWLEDGED((byte) 2),
+ARCHIVED((byte) 4);
+
+public final byte id;
+
+RecordState(byte id) {
+this.id = id;
+}
+
+/**
+ * Validates that the newState is one of the valid 
transition from the current
+ * {@code RecordState}.
+ *
+ * @param newState State into which requesting to transition; must be 
non-null
+ *
+ * @return {@code RecordState} newState if validation 
succeeds. Returning
+ * newState helps state assignment chaining.
+ *
+ * @throws IllegalStateException if the state transition validation 
fails.
+ */
+public RecordState validateTransition(RecordState newState) throws 
IllegalStateException {
+Objects.requireNonNull(newState, "newState cannot be null");
+if (this == newState) {
+throw new IllegalStateException("The state transition is 
invalid as the new state is"
++ "the same as the current state");
+}
+
+if (this == ACKNOWLEDGED || this == ARCHIVED) {
+throw new IllegalStateException("The state transition is 
invalid from the current state: " + this);
+}
+
+if (this == AVAILABLE && newState != ACQUIRED) {
+throw new IllegalStateException("The state can only be 
transitioned to ACQUIRED from AVAILABLE");
+}
+
+// Either the transition is from Available -> Acquired or from 
Acquired -> Available/
+// Acknowledged/Archived.
+return newState;
+}
+
+public static RecordState forId(byte id) {
+switch (id) {
+case 0:
+return AVAILABLE;
+case 1:
+return ACQUIRED;
+case 2:
+return ACKNOWLEDGED;
+case 4:
+return ARCHIVED;
+

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


omkreddy merged PR #16274:
URL: https://github.com/apache/kafka/pull/16274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


apoorvmittal10 commented on PR #16274:
URL: https://github.com/apache/kafka/pull/16274#issuecomment-2166734335

   @omkreddy Unrelated tests failure in the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1638137205


##
core/src/test/java/kafka/server/SharePartitionTest.java:
##
@@ -63,4 +111,172 @@ public void testRecordStateForId() {
 // Invalid check.
 assertThrows(IllegalArgumentException.class, () -> 
RecordState.forId((byte) 5));
 }
+
+@Test
+public void testAcquireSingleRecord() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(1);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 3, 0, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(1, sharePartition.nextFetchOffset());
+assertEquals(1, sharePartition.cachedState().size());
+assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+assertEquals(0, sharePartition.cachedState().get(0L).lastOffset());
+assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(0L).batchMemberId());
+assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+assertNull(sharePartition.cachedState().get(0L).offsetState());
+}
+
+@Test
+public void testAcquireMultipleRecords() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(5, 10);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(15, sharePartition.nextFetchOffset());
+assertEquals(1, sharePartition.cachedState().size());
+assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(10L).batchMemberId());
+assertEquals(1, 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+assertNull(sharePartition.cachedState().get(10L).offsetState());
+}
+
+@Test
+public void testAcquireMultipleRecordsWithOverlapAndNewBatch() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(5, 0);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(5, sharePartition.nextFetchOffset());
+
+// Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should 
be ignored.
+records = memoryRecords(10, 0);
+result = sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5), 
1).toArray(), acquiredRecordsList.toArray());

Review Comment:
   Subset batch triggers when there is different state for batch is being 
tracked, I ll be `acknowledge` functionality in the next PR and will add 
additional tests along.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1638135999


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -238,8 +245,77 @@ public static RecordState forId(byte id) {
  * @return The next fetch offset that should be fetched from the leader.
  */
 public long nextFetchOffset() {
-// TODO: Implement the logic to compute the next fetch offset.
-return 0;
+/*
+The logic for determining the next offset to fetch data from a Share 
Partition hinges on a
+flag called findNextFetchOffset. If this flag is set to true, then the 
next fetch offset
+should be re-computed, otherwise the next fetch offset is Share 
Partition End Offset + 1.
+The flag is set to true in the following cases:
+1. When some previously acquired records are acknowledged with type 
RELEASE.
+2. When the record lock duration expires for some acquired records.
+3. When some records are released on share session close.
+The re-computation of next fetch offset is done by iterating over the 
cachedState and finding
+the first available record. If no available record is found, then the 
next fetch offset is
+set to Share Partition End Offset + 1 and findNextFetchOffset flag is 
set to false.
+*/
+lock.writeLock().lock();
+try {
+// When none of the records in the cachedState are in the 
AVAILABLE state, findNextFetchOffset will be false
+if (!findNextFetchOffset.get()) {
+if (cachedState.isEmpty() || startOffset > 
cachedState.lastEntry().getValue().lastOffset()) {
+// 1. When cachedState is empty, endOffset is set to the 
next offset of the last offset removed from
+// batch, which is the next offset to be fetched.
+// 2. When startOffset has moved beyond the in-flight 
records, startOffset and endOffset point to the LSO,
+// which is the next offset to be fetched.
+return endOffset;
+} else {
+return endOffset + 1;
+}
+}
+
+// If this piece of code is reached, it means that 
findNextFetchOffset is true

Review Comment:
   I have added some tests which verifies some scenarios with 
`findNextFetchOffset` as true. For more tests I require `acknowledge` 
functionality and will be adding those in next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637993977


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param lastOffsetToCompare The last offset of the request batch.
+ *
+ * @return True if the in-flight batch is a full match with the request 
offsets, false otherwise.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private boolean checkForFullMatch(InFlightBatch inFlightBatch, long 
firstOffsetTo

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-13 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637992948


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param lastOffsetToCompare The last offset of the request batch.
+ *
+ * @return True if the in-flight batch is a full match with the request 
offsets, false otherwise.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private boolean checkForFullMatch(InFlightBatch inFlightBatch, long 
firstOffsetTo

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637579381


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param lastOffsetToCompare The last offset of the request batch.
+ *
+ * @return True if the in-flight batch is a full match with the request 
offsets, false otherwise.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private boolean checkForFullMatch(InFlightBatch inFlightBatch, long 
firstOffsetT

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


adixitconfluent commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637574165


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -54,6 +60,7 @@ public class SharePartition {
  * The state of the records determines if the records should be 
re-delivered, move the next fetch
  * offset, or be state persisted to disk.
  */
+// Visible for testing
 public enum RecordState {

Review Comment:
   Can we make it `default` instead of `public`



##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,126 +485,361 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+ * complete overlap of the in-flight batch with the request offsets.
+ *
+ * @param inFlightBatch The in-flight batch to check for full match.
+ * @param firstOffsetToCompare The first offset of the request batch.
+ * @param lastOffset

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


omkreddy commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636921661


##
core/src/test/java/kafka/server/SharePartitionTest.java:
##
@@ -63,4 +111,172 @@ public void testRecordStateForId() {
 // Invalid check.
 assertThrows(IllegalArgumentException.class, () -> 
RecordState.forId((byte) 5));
 }
+
+@Test
+public void testAcquireSingleRecord() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(1);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 3, 0, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(1, sharePartition.nextFetchOffset());
+assertEquals(1, sharePartition.cachedState().size());
+assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+assertEquals(0, sharePartition.cachedState().get(0L).lastOffset());
+assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(0L).batchMemberId());
+assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+assertNull(sharePartition.cachedState().get(0L).offsetState());
+}
+
+@Test
+public void testAcquireMultipleRecords() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(5, 10);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(15, sharePartition.nextFetchOffset());
+assertEquals(1, sharePartition.cachedState().size());
+assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(10L).batchMemberId());
+assertEquals(1, 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+assertNull(sharePartition.cachedState().get(10L).offsetState());
+}
+
+@Test
+public void testAcquireMultipleRecordsWithOverlapAndNewBatch() {
+SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+MemoryRecords records = memoryRecords(5, 0);
+
+CompletableFuture> result = 
sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+
+List acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+assertEquals(5, sharePartition.nextFetchOffset());
+
+// Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should 
be ignored.
+records = memoryRecords(10, 0);
+result = sharePartition.acquire(
+MEMBER_ID,
+new FetchPartitionData(Errors.NONE, 20, 3, records,
+Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+assertFalse(result.isCompletedExceptionally());
+acquiredRecordsList = result.join();
+assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5), 
1).toArray(), acquiredRecordsList.toArray());

Review Comment:
   any tests which acquires multiple subset batches?



##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -238,8 +245,77 @@ public static RecordState forId(byte id) {
  * @return The next fetch offset that should be fetched from the leader.
  */
 public long nextFetchOffset() {
-// TODO: Implement the logic to compute the next fetch offset.
-return 0;
+/*
+The logic for determining the next offset to fetch data from a Share 
Partition hinges on a
+flag called findNextFetchOffset. If this flag is set to true, then the 
next fetch offset
+should be r

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636681441


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -238,8 +245,77 @@ public static RecordState forId(byte id) {
  * @return The next fetch offset that should be fetched from the leader.
  */
 public long nextFetchOffset() {
-// TODO: Implement the logic to compute the next fetch offset.
-return 0;
+/*
+The logic for determining the next offset to fetch data from a Share 
Partition hinges on a
+flag called findNextFetchOffset. If this flag is set to true, then the 
next fetch offset
+should be re-computed, otherwise the next fetch offset is Share 
Partition End Offset + 1.
+The flag is set to true in the following cases:
+1. When some previously acquired records are acknowledged with type 
RELEASE.
+2. When the record lock duration expires for some acquired records.
+3. When some records are released on share session close.
+The re-computation of next fetch offset is done by iterating over the 
cachedState and finding
+the first available record. If no available record is found, then the 
next fetch offset is
+set to Share Partition End Offset + 1 and findNextFetchOffset flag is 
set to false.
+*/
+lock.writeLock().lock();
+try {
+// When none of the records in the cachedState are in the 
AVAILABLE state, findNextFetchOffset will be false
+if (!findNextFetchOffset.get()) {
+if (cachedState.isEmpty() || startOffset > 
cachedState.lastEntry().getValue().lastOffset) {
+// 1. When cachedState is empty, endOffset is set to the 
next offset of the last offset removed from
+// batch, which is the next offset to be fetched.
+// 2. When startOffset has moved beyond the in-flight 
records, startOffset and endOffset point to the LSO,
+// which is the next offset to be fetched.
+return endOffset;

Review Comment:
   Yeah, that's what my initial impression was. This concern only occurs when 
Log Start Offset (LSO) moves past the already fetched data. In that case, we 
would like to fetch from the last known endOffset, which is LSO, otherwise it 
ll be endOffset + 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on PR #16274:
URL: https://github.com/apache/kafka/pull/16274#issuecomment-2163312682

   > Since we have implemented the acquisition lock timeout in this PR, shall 
we add the tests corresponding to it as well, for example - test acquisition 
lock post acquiring single record?
   
   That's yet to implement. Maybe once this PR is merged you can implement the 
method for acquisition locks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636612042


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -257,11 +333,110 @@ public CompletableFuture> acquire(
 FetchPartitionData fetchPartitionData
 ) {
 log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+RecordBatch lastBatch = 
fetchPartitionData.records.lastBatch().orElse(null);
+if (lastBatch == null) {
+// Nothing to acquire.
+return CompletableFuture.completedFuture(Collections.emptyList());
+}
 
-CompletableFuture> future = new 
CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException("Not 
implemented"));
+// We require the first batch of records to get the base offset. Stop 
parsing further
+// batches.
+RecordBatch firstBatch = 
fetchPartitionData.records.batches().iterator().next();
+lock.writeLock().lock();
+try {
+long baseOffset = firstBatch.baseOffset();
+// Find the floor batch record for the request batch. The request 
batch could be
+// for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
+// of 12-13. Hence, floor entry is fetched to find the sub-map.
+Map.Entry floorOffset = 
cachedState.floorEntry(baseOffset);
+// We might find a batch with floor entry but not necessarily that 
batch has an overlap,
+// if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
+// batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
+if (floorOffset != null && floorOffset.getValue().lastOffset >= 
baseOffset) {
+baseOffset = floorOffset.getKey();
+}
+// Validate if the fetch records are already part of existing 
batches and if available.
+NavigableMap subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+// No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
+// batch.
+if (subMap.isEmpty()) {
+log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
+groupId, topicIdPartition);
+return 
CompletableFuture.completedFuture(Collections.singletonList(
+acquireNewBatchRecords(memberId, firstBatch.baseOffset(), 
lastBatch.lastOffset(;
+}
 
-return future;
+log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
++ " the share group: {}-{}", groupId, topicIdPartition);
+List result = new ArrayList<>();
+// The fetched records are already part of the in-flight records. 
The records might
+// be available for re-delivery hence try acquiring same. The 
request batches could
+// be an exact match, subset or span over multiple already fetched 
batches.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+// Compute if the batch is a full match.
+boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
+
+if (!fullMatch || inFlightBatch.offsetState != null) {
+log.trace("Subset or offset tracked batch record found for 
share partition,"
++ " batch: {} request offsets - first: {}, last: 
{} for the share"
++ " group: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
+lastBatch.lastOffset(), groupId, topicIdPartition);
+if (inFlightBatch.offsetState == null) {
+// Though the request is a subset of in-flight batch 
but the offset
+// tracking has not been initialized yet which means 
that we could only
+// acquire subset of offsets from the in-flight batch 
but only if the
+// complete batch is available yet. Hence, do a 
pre-check to avoid exploding
+// the in-flight offset tracking unnecessarily.
+if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+log.trace("The batch is not available to acquire 
in share group: {}-{}, skipping: {}"
++ " skipping offset tracking for batch as 
well.", groupId,
+topicIdPartition, inFlightBatch);
+continue;
+}
+// The request batch is a subset or per offset s

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636610291


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -257,11 +333,110 @@ public CompletableFuture> acquire(
 FetchPartitionData fetchPartitionData
 ) {
 log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+RecordBatch lastBatch = 
fetchPartitionData.records.lastBatch().orElse(null);
+if (lastBatch == null) {
+// Nothing to acquire.
+return CompletableFuture.completedFuture(Collections.emptyList());
+}
 
-CompletableFuture> future = new 
CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException("Not 
implemented"));
+// We require the first batch of records to get the base offset. Stop 
parsing further
+// batches.
+RecordBatch firstBatch = 
fetchPartitionData.records.batches().iterator().next();
+lock.writeLock().lock();
+try {
+long baseOffset = firstBatch.baseOffset();
+// Find the floor batch record for the request batch. The request 
batch could be
+// for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
+// of 12-13. Hence, floor entry is fetched to find the sub-map.
+Map.Entry floorOffset = 
cachedState.floorEntry(baseOffset);
+// We might find a batch with floor entry but not necessarily that 
batch has an overlap,
+// if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
+// batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
+if (floorOffset != null && floorOffset.getValue().lastOffset >= 
baseOffset) {
+baseOffset = floorOffset.getKey();
+}
+// Validate if the fetch records are already part of existing 
batches and if available.
+NavigableMap subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+// No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
+// batch.
+if (subMap.isEmpty()) {
+log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
+groupId, topicIdPartition);
+return 
CompletableFuture.completedFuture(Collections.singletonList(
+acquireNewBatchRecords(memberId, firstBatch.baseOffset(), 
lastBatch.lastOffset(;
+}
 
-return future;
+log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
++ " the share group: {}-{}", groupId, topicIdPartition);
+List result = new ArrayList<>();
+// The fetched records are already part of the in-flight records. 
The records might
+// be available for re-delivery hence try acquiring same. The 
request batches could
+// be an exact match, subset or span over multiple already fetched 
batches.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+// Compute if the batch is a full match.
+boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
+
+if (!fullMatch || inFlightBatch.offsetState != null) {
+log.trace("Subset or offset tracked batch record found for 
share partition,"
++ " batch: {} request offsets - first: {}, last: 
{} for the share"
++ " group: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
+lastBatch.lastOffset(), groupId, topicIdPartition);
+if (inFlightBatch.offsetState == null) {
+// Though the request is a subset of in-flight batch 
but the offset
+// tracking has not been initialized yet which means 
that we could only
+// acquire subset of offsets from the in-flight batch 
but only if the
+// complete batch is available yet. Hence, do a 
pre-check to avoid exploding
+// the in-flight offset tracking unnecessarily.
+if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+log.trace("The batch is not available to acquire 
in share group: {}-{}, skipping: {}"
++ " skipping offset tracking for batch as 
well.", groupId,
+topicIdPartition, inFlightBatch);
+continue;
+}
+// The request batch is a subset or per offset s

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636601366


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,37 +485,264 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
+return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
+}
+
+// TODO: maxDeliveryCount should be utilized here once it is implemented
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Apply acquisition lock to acquired records.
+ * @param memberId The member id of the client that is putting the 
acquisition lock.
+ * @param firstOffset The first offset of the acquired records.
+ * @param lastOffset The last offset of the acquired records.
  */

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abo

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636598871


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,37 +485,264 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
+return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
+}
+
+// TODO: maxDeliveryCount should be utilized here once it is implemented
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Apply acquisition lock to acquired records.
+ * @param memberId The member id of the client that is putting the 
acquisition lock.
+ * @param firstOffset The first offset of the acquired records.
+ * @param lastOffset The last offset of the acquired records.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private A

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636594021


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,37 +485,264 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
+return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
+}
+
+// TODO: maxDeliveryCount should be utilized here once it is implemented
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Apply acquisition lock to acquired records.
+ * @param memberId The member id of the client that is putting the 
acquisition lock.
+ * @param firstOffset The first offset of the acquired records.
+ * @param lastOffset The last offset of the acquired records.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private A

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636573979


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,37 +485,264 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+}
+endOffset = lastOffset;
+return new AcquiredRecords()
+.setFirstOffset(firstOffset)
+.setLastOffset(lastOffset)
+.setDeliveryCount((short) 1);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void acquireSubsetBatchRecords(
+String memberId,
+long requestFirstOffset,
+long requestLastOffset,
+InFlightBatch inFlightBatch,
+List result
+) {
+lock.writeLock().lock();
+try {
+for (Map.Entry offsetState : 
inFlightBatch.offsetState.entrySet()) {
+// For the first batch which might have offsets prior to the 
request base
+// offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+if (offsetState.getKey() < requestFirstOffset) {
+continue;
+}
+
+if (offsetState.getKey() > requestLastOffset) {
+// No further offsets to process.
+break;
+}
+
+if (offsetState.getValue().state != RecordState.AVAILABLE) {
+log.trace("The offset is not available skipping, offset: 
{} batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+
+InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+memberId);
+if (updateResult == null) {
+log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
++ " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+groupId, topicIdPartition);
+continue;
+}
+// Schedule acquisition lock timeout for the offset.
+AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+// Update acquisition lock timeout task for the offset.
+
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+// TODO: Maybe we can club the continuous offsets here.
+result.add(new AcquiredRecords()
+.setFirstOffset(offsetState.getKey())
+.setLastOffset(offsetState.getKey())
+.setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
+return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
+}
+
+// TODO: maxDeliveryCount should be utilized here once it is implemented
 /**
- * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+ * Apply acquisition lock to acquired records.
+ * @param memberId The member id of the client that is putting the 
acquisition lock.
+ * @param firstOffset The first offset of the acquired records.
+ * @param lastOffset The last offset of the acquired records.
  */
-private static class InFlightBatch {
-/**
- * The offset of the first record in the batch that is fetched from 
the log.
- */
+private A

Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-12 Thread via GitHub


apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636568658


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -350,87 +752,73 @@ public String toString() {
  * fetched from the leader. The state of the record is used to determine 
if the record should
  * be re-deliver or if it can be acknowledged or archived.
  */
-private static class InFlightState {
-/**
- * The state of the fetch batch records.
- */
+static final class InFlightState {
+
+// The state of the fetch batch records.
 private RecordState state;
-/**
- * The number of times the records has been delivered to the client.
- */
+// The number of times the records has been delivered to the client.
 private int deliveryCount;
-/**
- * The member id of the client that is fetching/acknowledging the 
record.
- */
+// The member id of the client that is fetching/acknowledging the 
record.
 private String memberId;
+// The timer task for the acquisition lock timeout.
+private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+
 
 InFlightState(RecordState state, int deliveryCount, String memberId) {
+this(state, deliveryCount, memberId, null);
+}
+
+InFlightState(RecordState state, int deliveryCount, String memberId, 
AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
 this.state = state;
 this.deliveryCount = deliveryCount;
 this.memberId = memberId;
+this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
 }
 
-@Override
-public int hashCode() {
-return Objects.hash(state, deliveryCount, memberId);
+void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask 
acquisitionLockTimeoutTask) {
+this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
 }
 
-@Override
-public boolean equals(Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
+void cancelAndClearAcquisitionLockTimeoutTask() {
+acquisitionLockTimeoutTask.cancel();
+acquisitionLockTimeoutTask = null;
+}
+
+/**
+ * Try to update the state of the records. The state of the records 
can only be updated if the
+ * new state is allowed to be transitioned from old state. The 
delivery count is not incremented
+ * if the state update is unsuccessful.
+ *
+ * @param newState The new state of the records.
+ * @param incrementDeliveryCount Whether to increment the delivery 
count.
+ *
+ * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
+ * helps update chaining.
+ */
+private InFlightState tryUpdateState(RecordState newState, boolean 
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+try {
+if (newState == RecordState.AVAILABLE && deliveryCount >= 
maxDeliveryCount) {
+newState = RecordState.ARCHIVED;
+}
+state = state.validateTransition(newState);
+if (incrementDeliveryCount && newState != 
RecordState.ARCHIVED) {
+deliveryCount++;
+}
+memberId = newMemberId;
+return this;
+} catch (IllegalStateException e) {
+log.info("Failed to update state of the records", e);

Review Comment:
   I have moved it to error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-11 Thread via GitHub


AndrewJSchofield commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1634746185


##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -350,87 +752,73 @@ public String toString() {
  * fetched from the leader. The state of the record is used to determine 
if the record should
  * be re-deliver or if it can be acknowledged or archived.
  */
-private static class InFlightState {
-/**
- * The state of the fetch batch records.
- */
+static final class InFlightState {
+
+// The state of the fetch batch records.
 private RecordState state;
-/**
- * The number of times the records has been delivered to the client.
- */
+// The number of times the records has been delivered to the client.
 private int deliveryCount;
-/**
- * The member id of the client that is fetching/acknowledging the 
record.
- */
+// The member id of the client that is fetching/acknowledging the 
record.
 private String memberId;
+// The timer task for the acquisition lock timeout.
+private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+
 
 InFlightState(RecordState state, int deliveryCount, String memberId) {
+this(state, deliveryCount, memberId, null);
+}
+
+InFlightState(RecordState state, int deliveryCount, String memberId, 
AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
 this.state = state;
 this.deliveryCount = deliveryCount;
 this.memberId = memberId;
+this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
 }
 
-@Override
-public int hashCode() {
-return Objects.hash(state, deliveryCount, memberId);
+void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask 
acquisitionLockTimeoutTask) {
+this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
 }
 
-@Override
-public boolean equals(Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
+void cancelAndClearAcquisitionLockTimeoutTask() {
+acquisitionLockTimeoutTask.cancel();
+acquisitionLockTimeoutTask = null;
+}
+
+/**
+ * Try to update the state of the records. The state of the records 
can only be updated if the
+ * new state is allowed to be transitioned from old state. The 
delivery count is not incremented
+ * if the state update is unsuccessful.
+ *
+ * @param newState The new state of the records.
+ * @param incrementDeliveryCount Whether to increment the delivery 
count.
+ *
+ * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
+ * helps update chaining.
+ */
+private InFlightState tryUpdateState(RecordState newState, boolean 
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+try {
+if (newState == RecordState.AVAILABLE && deliveryCount >= 
maxDeliveryCount) {
+newState = RecordState.ARCHIVED;
+}
+state = state.validateTransition(newState);
+if (incrementDeliveryCount && newState != 
RecordState.ARCHIVED) {
+deliveryCount++;
+}
+memberId = newMemberId;
+return this;
+} catch (IllegalStateException e) {
+log.info("Failed to update state of the records", e);

Review Comment:
   I don't think log.info is appropriate here.



##
core/src/main/java/kafka/server/SharePartition.java:
##
@@ -310,37 +485,264 @@ private void initialize() {
 // TODO: Provide implementation to initialize the share partition.
 }
 
+private AcquiredRecords acquireNewBatchRecords(
+String memberId,
+long firstOffset,
+long lastOffset
+) {
+lock.writeLock().lock();
+try {
+// Schedule acquisition lock timeout for the batch.
+AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+// Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+cachedState.put(firstOffset, new InFlightBatch(
+memberId,
+firstOffset,
+lastOffset,
+RecordState.ACQUIRED,
+1,
+timerTask));
+// if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+if (cachedState.firstKey() == firstOffset)  {
+startOffset = firstOffset;
+

[PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-10 Thread via GitHub


apoorvmittal10 opened a new pull request, #16274:
URL: https://github.com/apache/kafka/pull/16274

   The implementation for share-fetch `next-fetch-offset` in share partition 
and acquiring records from log.
   
   The Next Fetch Offset (NFO) determines where the Share Partition should 
initiate the next data read from the Replica Manager. While it typically aligns 
with the last offset of the most recently returned batch, last offset + 1, 
there are exceptions. Messages marked available again due to release 
acknowledgements or lock timeouts can cause the NFO to shift.
   
   The acquire method caches the batches as acquired in-memory and spawns a 
timer task for lock timeout.
   
   ### Cache
   1. `Per-offset Metadata`: Simple to implement but inefficient. Every offset 
requires in-memory storage and traversal, leading to high memory usage and 
processing overhead, especially for per-batch acknowledgements (mostly the way 
records would be acknowledged).
   
   2. `Per-Replica Fetch Batch`: This approach aligns with the Replica Manager 
fetch batches. Since a full Replica Manager batch is retrieved whenever the 
requested offset falls within that batch's boundaries, a single Share Fetch 
request will likely receive an entire Replica Manager batch. However, there's a 
trade-off. Replica Manager batches are based on producer batching. If producers 
don't batch effectively, the in-flight metadata becomes heavily reliant on the 
producer's batching behavior.
   
   For per-message acknowledgements, per-offset tracking will be necessary 
which again requires splitting in-flight batches based on state. Splitting 
bacthes is inefficient as it requires cache update wshich maintains sorted 
order. Therefore, we propose a hybrid approach:
   
   Implemented a combination of option 2 (per-in-flight batch tracking) with 
option 1 (per-offset tracking). This aligns well with Replica Manager batching.
   
   States shall be maintained per in-flight batch. If state inconsistencies 
arise within in-flight batches due to per-message acknowledgements, switch 
state tracking for the respective batch to option 1 (per-offset tracking).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org