This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25af4b8a7d0 MINOR: refactor Kafka Streams "segment" code (#21520)
25af4b8a7d0 is described below

commit 25af4b8a7d09d8f82cb0b1befeccae0a2000f527
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Feb 25 11:24:21 2026 -0800

    MINOR: refactor Kafka Streams "segment" code (#21520)
    
    All implementations of Segment implement Comparable. We can let Segment
    extend Comparable directly to reduce code duplication.
    
    All sub-classes of AbstractSegments have very similar implementations of
    getOrCreateSegment(). We can move the shared code into AbstractSegments.
    
    This PR also removes unused methods from SegmentedByteStore.
    
    Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
     <[email protected]>, Bill Bejeck <[email protected]>
---
 .../AbstractRocksDBSegmentedBytesStore.java        |  9 -------
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |  6 -----
 .../streams/state/internals/AbstractSegments.java  | 25 ++++++++++++++++++-
 .../streams/state/internals/KeyValueSegment.java   | 14 +++++------
 .../streams/state/internals/KeyValueSegments.java  | 25 +++----------------
 .../state/internals/LogicalKeyValueSegment.java    |  7 +-----
 .../state/internals/LogicalKeyValueSegments.java   | 29 ++++++++--------------
 .../kafka/streams/state/internals/Segment.java     |  9 ++++++-
 .../state/internals/SegmentedBytesStore.java       | 20 ---------------
 .../state/internals/TimestampedSegment.java        | 15 ++++++-----
 .../internals/TimestampedSegmentWithHeaders.java   | 17 ++++++-------
 .../state/internals/TimestampedSegments.java       | 25 +++----------------
 .../internals/TimestampedSegmentsWithHeaders.java  | 25 +++----------------
 .../state/internals/KeyValueSegmentsTest.java      | 26 +++++++++----------
 .../state/internals/TimestampedSegmentsTest.java   | 26 +++++++++----------
 .../TimestampedSegmentsWithHeadersTest.java        |  6 ++---
 16 files changed, 106 insertions(+), 178 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 201ca7ff941..2baae916e34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -244,15 +244,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
         segment.delete(key);
     }
 
-    @Override
-    public void remove(final Bytes key, final long timestamp) {
-        final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, 
timestamp);
-        final S segment = segments.segmentForTimestamp(timestamp);
-        if (segment != null) {
-            segment.deleteRange(keyBytes, keyBytes);
-        }
-    }
-
     @Override
     public void put(final Bytes key,
                     final byte[] value) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index e6a87f6110e..5ec5e011a2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -244,12 +244,6 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
             forward);
     }
 
-
-    @Override
-    public void remove(final Bytes key, final long timestamp) {
-        throw new UnsupportedOperationException("Not supported operation");
-    }
-
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
                                                     final long timeTo) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 71044792a0b..83c11bd16c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -57,6 +57,10 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
     }
 
+    protected abstract S createSegment(long segmentId, String segmentName);
+
+    protected abstract void openSegmentDB(final S segment, final 
StateStoreContext context);
+
     public void setPosition(final Position position) {
         this.position = position;
     }
@@ -80,6 +84,23 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
         return segments.get(segmentId(timestamp));
     }
 
+    @Override
+    public S getOrCreateSegment(final long segmentId,
+                                final StateStoreContext context) {
+        if (segments.containsKey(segmentId)) {
+            return segments.get(segmentId);
+        } else {
+            final S newSegment = createSegment(segmentId, 
segmentName(segmentId));
+
+            if (segments.put(segmentId, newSegment) != null) {
+                throw new 
IllegalStateException(newSegment.getClass().getSimpleName() + " already exists. 
Possible concurrent access.");
+            }
+
+            openSegmentDB(newSegment, context);
+            return newSegment;
+        }
+    }
+
     @Override
     public S getOrCreateSegmentIfLive(final long segmentId,
                                       final StateStoreContext context,
@@ -89,7 +110,9 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
 
         if (segmentId >= minLiveSegment) {
             // The segment is live. get it, ensure it's open, and return it.
-            return getOrCreateSegment(segmentId, context);
+            final S segment = getOrCreateSegment(segmentId, context);
+            cleanupExpiredSegments(streamTime);
+            return segment;
         } else {
             return null;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 1cf631c3724..9935b80abf1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
-class KeyValueSegment extends RocksDBStore implements 
Comparable<KeyValueSegment>, Segment {
-    public final long id;
+class KeyValueSegment extends RocksDBStore implements Segment {
+    private final long id;
 
     KeyValueSegment(final String segmentName,
                     final String windowName,
@@ -39,6 +39,11 @@ class KeyValueSegment extends RocksDBStore implements 
Comparable<KeyValueSegment
         this.position = position;
     }
 
+    @Override
+    public long id() {
+        return id;
+    }
+
     @Override
     public void destroy() throws IOException {
         Utils.delete(dbDir);
@@ -49,11 +54,6 @@ class KeyValueSegment extends RocksDBStore implements 
Comparable<KeyValueSegment
         super.deleteRange(keyFrom, keyTo);
     }
 
-    @Override
-    public int compareTo(final KeyValueSegment segment) {
-        return Long.compare(id, segment.id);
-    }
-
     @Override
     public void openDB(final Map<String, Object> configs, final File stateDir) 
{
         super.openDB(configs, stateDir);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index a18d901b83f..d2def2c010f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -36,30 +36,13 @@ class KeyValueSegments extends 
AbstractSegments<KeyValueSegment> {
     }
 
     @Override
-    public KeyValueSegment getOrCreateSegment(final long segmentId,
-                                              final StateStoreContext context) 
{
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            final KeyValueSegment newSegment =
-                new KeyValueSegment(segmentName(segmentId), name, segmentId, 
position, metricsRecorder);
-
-            if (segments.put(segmentId, newSegment) != null) {
-                throw new IllegalStateException("KeyValueSegment already 
exists. Possible concurrent access.");
-            }
-
-            newSegment.openDB(context.appConfigs(), context.stateDir());
-            return newSegment;
-        }
+    protected KeyValueSegment createSegment(final long segmentId, final String 
segmentName) {
+        return new KeyValueSegment(segmentName, name, segmentId, position, 
metricsRecorder);
     }
 
     @Override
-    public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
-                                                    final StateStoreContext 
context,
-                                                    final long streamTime) {
-        final KeyValueSegment segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
-        cleanupExpiredSegments(streamTime);
-        return segment;
+    protected void openSegmentDB(final KeyValueSegment segment, final 
StateStoreContext context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 4d39b94561b..daa98142886 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -53,7 +53,7 @@ import static 
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWit
  * stores a key into a shared physical store by prepending the key with a 
prefix (unique to
  * the specific logical segment), and storing the combined key into the 
physical store.
  */
-class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, 
Segment, VersionedStoreSegment {
+class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
     private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
 
     private final long id;
@@ -78,11 +78,6 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
         return id;
     }
 
-    @Override
-    public int compareTo(final LogicalKeyValueSegment segment) {
-        return Long.compare(id, segment.id);
-    }
-
     @Override
     public synchronized void destroy() {
         if (id < 0) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index fbca4f5cd4e..6d2ebee7638 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -62,25 +62,18 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     }
 
     @Override
-    public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
-                                                     final StateStoreContext 
context) {
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            if (segmentId < 0) {
-                throw new IllegalArgumentException(
-                    "Negative segment IDs are reserved for reserved segments, "
-                        + "and should be created through 
createReservedSegment() instead");
-            }
-
-            final LogicalKeyValueSegment newSegment = new 
LogicalKeyValueSegment(segmentId, segmentName(segmentId), physicalStore);
-
-            if (segments.put(segmentId, newSegment) != null) {
-                throw new IllegalStateException("LogicalKeyValueSegment 
already exists. Possible concurrent access.");
-            }
-
-            return newSegment;
+    protected LogicalKeyValueSegment createSegment(final long segmentId, final 
String segmentName) {
+        if (segmentId < 0) {
+            throw new IllegalArgumentException(
+                "Negative segment IDs are reserved for reserved segments, "
+                    + "and should be created through createReservedSegment() 
instead");
         }
+        return new LogicalKeyValueSegment(segmentId, segmentName, 
physicalStore);
+    }
+
+    @Override
+    protected void openSegmentDB(final LogicalKeyValueSegment segment, final 
StateStoreContext context) {
+        // no-op -- a logical segment is just a view on an underlying physical 
store
     }
 
     LogicalKeyValueSegment createReservedSegment(final long segmentId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index ea3b89a525e..598b396f3fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -21,9 +21,16 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.io.IOException;
 
-public interface Segment extends KeyValueStore<Bytes, byte[]>, 
BatchWritingStore {
+public interface Segment extends KeyValueStore<Bytes, byte[]>, 
BatchWritingStore, Comparable<Segment> {
+
+    long id();
 
     void destroy() throws IOException;
 
     void deleteRange(Bytes keyFrom, Bytes keyTo);
+
+    @Override
+    default int compareTo(final Segment segment) {
+        return Long.compare(id(), segment.id());
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 3b813262d6f..957867d21ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -109,14 +109,6 @@ public interface SegmentedBytesStore extends StateStore {
      */
     void remove(Bytes key);
 
-    /**
-     * Remove all duplicated records with the provided key in the specified 
timestamp.
-     *
-     * @param key   the segmented key to remove
-     * @param timestamp  the timestamp to match
-     */
-    void remove(Bytes key, long timestamp);
-
     /**
      * Write a new value to the store with the provided key. The key
      * should be a composite of the record key, and the timestamp information 
etc
@@ -159,18 +151,6 @@ public interface SegmentedBytesStore extends StateStore {
          */
         Bytes lowerRange(final Bytes key, final long from);
 
-        /**
-         * Given a record key and a time, construct a Segmented key to search 
when performing
-         * prefixed queries.
-         *
-         * @param key
-         * @param timestamp
-         * @return  The key that represents the prefixed Segmented key in 
bytes.
-         */
-        default Bytes toStoreBinaryKeyPrefix(final Bytes key, final long 
timestamp) {
-            throw new UnsupportedOperationException();
-        }
-
         /**
          * Given a range of fixed size record keys and a time, construct a 
Segmented key that represents
          * the upper range of keys to search when performing range queries.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 1bf07afab77..fab99b0c293 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
-class TimestampedSegment extends RocksDBTimestampedStore implements 
Comparable<TimestampedSegment>, Segment {
-    public final long id;
+class TimestampedSegment extends RocksDBTimestampedStore implements Segment {
+    private final long id;
 
     TimestampedSegment(final String segmentName,
                        final String windowName,
@@ -39,6 +39,11 @@ class TimestampedSegment extends RocksDBTimestampedStore 
implements Comparable<T
         this.position = position;
     }
 
+    @Override
+    public long id() {
+        return id;
+    }
+
     @Override
     public void destroy() throws IOException {
         Utils.delete(dbDir);
@@ -49,15 +54,9 @@ class TimestampedSegment extends RocksDBTimestampedStore 
implements Comparable<T
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public int compareTo(final TimestampedSegment segment) {
-        return Long.compare(id, segment.id);
-    }
-
     @Override
     public void openDB(final Map<String, Object> configs, final File stateDir) 
{
         super.openDB(configs, stateDir);
-        // skip the registering step
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
index aeba24d30e4..5ecc13e8fc2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
@@ -33,10 +33,8 @@ import java.util.Objects;
  * header-aware storage with dual-column-family migration support from
  * timestamp-only format to timestamp+headers format.
  */
-class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
-    implements Comparable<TimestampedSegmentWithHeaders>, Segment {
-
-    public final long id;
+class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders 
implements Segment {
+    private final long id;
 
     TimestampedSegmentWithHeaders(final String segmentName,
                                   final String windowName,
@@ -48,6 +46,11 @@ class TimestampedSegmentWithHeaders extends 
RocksDBTimestampedStoreWithHeaders
         this.position = position;
     }
 
+    @Override
+    public long id() {
+        return id;
+    }
+
     @Override
     public void destroy() throws IOException {
         Utils.delete(dbDir);
@@ -58,15 +61,9 @@ class TimestampedSegmentWithHeaders extends 
RocksDBTimestampedStoreWithHeaders
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public int compareTo(final TimestampedSegmentWithHeaders segment) {
-        return Long.compare(id, segment.id);
-    }
-
     @Override
     public void openDB(final Map<String, Object> configs, final File stateDir) 
{
         super.openDB(configs, stateDir);
-        // skip the registering step
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 70fae503060..f63b3f64d59 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -36,30 +36,13 @@ class TimestampedSegments extends 
AbstractSegments<TimestampedSegment> {
     }
 
     @Override
-    public TimestampedSegment getOrCreateSegment(final long segmentId,
-                                                 final StateStoreContext 
context) {
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            final TimestampedSegment newSegment =
-                new TimestampedSegment(segmentName(segmentId), name, 
segmentId, position, metricsRecorder);
-
-            if (segments.put(segmentId, newSegment) != null) {
-                throw new IllegalStateException("TimestampedSegment already 
exists. Possible concurrent access.");
-            }
-
-            newSegment.openDB(context.appConfigs(), context.stateDir());
-            return newSegment;
-        }
+    protected TimestampedSegment createSegment(final long segmentId, final 
String segmentName) {
+        return new TimestampedSegment(segmentName, name, segmentId, position, 
metricsRecorder);
     }
 
     @Override
-    public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
-                                                    final StateStoreContext 
context,
-                                                    final long streamTime) {
-        final TimestampedSegment segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
-        cleanupExpiredSegments(streamTime);
-        return segment;
+    protected void openSegmentDB(final TimestampedSegment segment, final 
StateStoreContext context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
index 4e770cc24bd..9b6433d041e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -36,30 +36,13 @@ class TimestampedSegmentsWithHeaders extends 
AbstractSegments<TimestampedSegment
     }
 
     @Override
-    public TimestampedSegmentWithHeaders getOrCreateSegment(final long 
segmentId,
-                                                            final 
StateStoreContext context) {
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            final TimestampedSegmentWithHeaders newSegment =
-                new TimestampedSegmentWithHeaders(segmentName(segmentId), 
name, segmentId, position, metricsRecorder);
-
-            if (segments.put(segmentId, newSegment) != null) {
-                throw new IllegalStateException("TimestampedSegmentWithHeaders 
already exists. Possible concurrent access.");
-            }
-
-            newSegment.openDB(context.appConfigs(), context.stateDir());
-            return newSegment;
-        }
+    protected TimestampedSegmentWithHeaders createSegment(final long 
segmentId, final String segmentName) {
+        return new TimestampedSegmentWithHeaders(segmentName, name, segmentId, 
position, metricsRecorder);
     }
 
     @Override
-    public TimestampedSegmentWithHeaders getOrCreateSegmentIfLive(final long 
segmentId,
-                                                                   final 
StateStoreContext context,
-                                                                   final long 
streamTime) {
-        final TimestampedSegmentWithHeaders segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
-        cleanupExpiredSegments(streamTime);
-        return segment;
+    protected void openSegmentDB(final TimestampedSegmentWithHeaders segment, 
final StateStoreContext context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 9e083ebbaf2..50da7b5d9f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -191,9 +191,9 @@ public class KeyValueSegmentsTest {
 
         final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL, true);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(2).id);
+        assertEquals(0, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(2).id());
     }
 
     @Test
@@ -211,9 +211,9 @@ public class KeyValueSegmentsTest {
 
         final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL, false);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(2).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(0).id);
+        assertEquals(0, segments.get(2).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(0).id());
     }
 
     @Test
@@ -226,9 +226,9 @@ public class KeyValueSegmentsTest {
 
         final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL, true);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(2).id);
+        assertEquals(0, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(2).id());
     }
 
     @Test
@@ -241,9 +241,9 @@ public class KeyValueSegmentsTest {
 
         final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL, false);
         assertEquals(3, segments.size());
-        assertEquals(2, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(0, segments.get(2).id);
+        assertEquals(2, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(0, segments.get(2).id());
     }
 
     @Test
@@ -349,7 +349,7 @@ public class KeyValueSegmentsTest {
         final List<KeyValueSegment> result = this.segments.segments(0, 
Long.MAX_VALUE, true);
         assertEquals(numSegments, result.size());
         for (int i = 0; i < numSegments; i++) {
-            assertEquals(i + first, result.get(i).id);
+            assertEquals(i + first, result.get(i).id());
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 63a70acb11c..ef9f5625d7a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -192,9 +192,9 @@ public class TimestampedSegmentsTest {
 
         final List<TimestampedSegment> segments = this.segments.segments(0, 2 
* SEGMENT_INTERVAL, true);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(2).id);
+        assertEquals(0, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(2).id());
     }
 
     @Test
@@ -212,9 +212,9 @@ public class TimestampedSegmentsTest {
 
         final List<TimestampedSegment> segments = this.segments.segments(0, 2 
* SEGMENT_INTERVAL, false);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(2).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(0).id);
+        assertEquals(0, segments.get(2).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(0).id());
     }
 
     @Test
@@ -227,9 +227,9 @@ public class TimestampedSegmentsTest {
 
         final List<TimestampedSegment> segments = this.segments.segments(0, 2 
* SEGMENT_INTERVAL, true);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(2).id);
+        assertEquals(0, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(2).id());
     }
 
     @Test
@@ -242,9 +242,9 @@ public class TimestampedSegmentsTest {
 
         final List<TimestampedSegment> segments = this.segments.segments(0, 2 
* SEGMENT_INTERVAL, false);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(2).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(0).id);
+        assertEquals(0, segments.get(2).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(0).id());
     }
 
     @Test
@@ -350,7 +350,7 @@ public class TimestampedSegmentsTest {
         final List<TimestampedSegment> result = this.segments.segments(0, 
Long.MAX_VALUE, true);
         assertEquals(numSegments, result.size());
         for (int i = 0; i < numSegments; i++) {
-            assertEquals(i + first, result.get(i).id);
+            assertEquals(i + first, result.get(i).id());
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
index 1f580b3a601..b3651cf7b25 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -177,9 +177,9 @@ public class TimestampedSegmentsWithHeadersTest {
 
         final List<TimestampedSegmentWithHeaders> segments = 
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
         assertEquals(3, segments.size());
-        assertEquals(0, segments.get(0).id);
-        assertEquals(1, segments.get(1).id);
-        assertEquals(2, segments.get(2).id);
+        assertEquals(0, segments.get(0).id());
+        assertEquals(1, segments.get(1).id());
+        assertEquals(2, segments.get(2).id());
     }
 
     @Test

Reply via email to