This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25af4b8a7d0 MINOR: refactor Kafka Streams "segment" code (#21520)
25af4b8a7d0 is described below
commit 25af4b8a7d09d8f82cb0b1befeccae0a2000f527
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Feb 25 11:24:21 2026 -0800
MINOR: refactor Kafka Streams "segment" code (#21520)
All implementations of Segment implement Comparable. We can let Segment
extend Comparable directly to reduce code duplication.
All sub-classes of AbstractSegments have very similar implementations of
getOrCreateSegment(). We can move the shared code into AbstractSegments.
This PR also removes unused methods from SegmentedByteStore.
Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
<[email protected]>, Bill Bejeck <[email protected]>
---
.../AbstractRocksDBSegmentedBytesStore.java | 9 -------
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 6 -----
.../streams/state/internals/AbstractSegments.java | 25 ++++++++++++++++++-
.../streams/state/internals/KeyValueSegment.java | 14 +++++------
.../streams/state/internals/KeyValueSegments.java | 25 +++----------------
.../state/internals/LogicalKeyValueSegment.java | 7 +-----
.../state/internals/LogicalKeyValueSegments.java | 29 ++++++++--------------
.../kafka/streams/state/internals/Segment.java | 9 ++++++-
.../state/internals/SegmentedBytesStore.java | 20 ---------------
.../state/internals/TimestampedSegment.java | 15 ++++++-----
.../internals/TimestampedSegmentWithHeaders.java | 17 ++++++-------
.../state/internals/TimestampedSegments.java | 25 +++----------------
.../internals/TimestampedSegmentsWithHeaders.java | 25 +++----------------
.../state/internals/KeyValueSegmentsTest.java | 26 +++++++++----------
.../state/internals/TimestampedSegmentsTest.java | 26 +++++++++----------
.../TimestampedSegmentsWithHeadersTest.java | 6 ++---
16 files changed, 106 insertions(+), 178 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 201ca7ff941..2baae916e34 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -244,15 +244,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
segment.delete(key);
}
- @Override
- public void remove(final Bytes key, final long timestamp) {
- final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key,
timestamp);
- final S segment = segments.segmentForTimestamp(timestamp);
- if (segment != null) {
- segment.deleteRange(keyBytes, keyBytes);
- }
- }
-
@Override
public void put(final Bytes key,
final byte[] value) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index e6a87f6110e..5ec5e011a2b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -244,12 +244,6 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
forward);
}
-
- @Override
- public void remove(final Bytes key, final long timestamp) {
- throw new UnsupportedOperationException("Not supported operation");
- }
-
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 71044792a0b..83c11bd16c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -57,6 +57,10 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}
+ protected abstract S createSegment(long segmentId, String segmentName);
+
+ protected abstract void openSegmentDB(final S segment, final
StateStoreContext context);
+
public void setPosition(final Position position) {
this.position = position;
}
@@ -80,6 +84,23 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
return segments.get(segmentId(timestamp));
}
+ @Override
+ public S getOrCreateSegment(final long segmentId,
+ final StateStoreContext context) {
+ if (segments.containsKey(segmentId)) {
+ return segments.get(segmentId);
+ } else {
+ final S newSegment = createSegment(segmentId,
segmentName(segmentId));
+
+ if (segments.put(segmentId, newSegment) != null) {
+ throw new
IllegalStateException(newSegment.getClass().getSimpleName() + " already exists.
Possible concurrent access.");
+ }
+
+ openSegmentDB(newSegment, context);
+ return newSegment;
+ }
+ }
+
@Override
public S getOrCreateSegmentIfLive(final long segmentId,
final StateStoreContext context,
@@ -89,7 +110,9 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
- return getOrCreateSegment(segmentId, context);
+ final S segment = getOrCreateSegment(segmentId, context);
+ cleanupExpiredSegments(streamTime);
+ return segment;
} else {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 1cf631c3724..9935b80abf1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -26,8 +26,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
-class KeyValueSegment extends RocksDBStore implements
Comparable<KeyValueSegment>, Segment {
- public final long id;
+class KeyValueSegment extends RocksDBStore implements Segment {
+ private final long id;
KeyValueSegment(final String segmentName,
final String windowName,
@@ -39,6 +39,11 @@ class KeyValueSegment extends RocksDBStore implements
Comparable<KeyValueSegment
this.position = position;
}
+ @Override
+ public long id() {
+ return id;
+ }
+
@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
@@ -49,11 +54,6 @@ class KeyValueSegment extends RocksDBStore implements
Comparable<KeyValueSegment
super.deleteRange(keyFrom, keyTo);
}
- @Override
- public int compareTo(final KeyValueSegment segment) {
- return Long.compare(id, segment.id);
- }
-
@Override
public void openDB(final Map<String, Object> configs, final File stateDir)
{
super.openDB(configs, stateDir);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index a18d901b83f..d2def2c010f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -36,30 +36,13 @@ class KeyValueSegments extends
AbstractSegments<KeyValueSegment> {
}
@Override
- public KeyValueSegment getOrCreateSegment(final long segmentId,
- final StateStoreContext context)
{
- if (segments.containsKey(segmentId)) {
- return segments.get(segmentId);
- } else {
- final KeyValueSegment newSegment =
- new KeyValueSegment(segmentName(segmentId), name, segmentId,
position, metricsRecorder);
-
- if (segments.put(segmentId, newSegment) != null) {
- throw new IllegalStateException("KeyValueSegment already
exists. Possible concurrent access.");
- }
-
- newSegment.openDB(context.appConfigs(), context.stateDir());
- return newSegment;
- }
+ protected KeyValueSegment createSegment(final long segmentId, final String
segmentName) {
+ return new KeyValueSegment(segmentName, name, segmentId, position,
metricsRecorder);
}
@Override
- public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
- final StateStoreContext
context,
- final long streamTime) {
- final KeyValueSegment segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
- cleanupExpiredSegments(streamTime);
- return segment;
+ protected void openSegmentDB(final KeyValueSegment segment, final
StateStoreContext context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 4d39b94561b..daa98142886 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -53,7 +53,7 @@ import static
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWit
* stores a key into a shared physical store by prepending the key with a
prefix (unique to
* the specific logical segment), and storing the combined key into the
physical store.
*/
-class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>,
Segment, VersionedStoreSegment {
+class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
private static final Logger log =
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
private final long id;
@@ -78,11 +78,6 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
return id;
}
- @Override
- public int compareTo(final LogicalKeyValueSegment segment) {
- return Long.compare(id, segment.id);
- }
-
@Override
public synchronized void destroy() {
if (id < 0) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index fbca4f5cd4e..6d2ebee7638 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -62,25 +62,18 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
}
@Override
- public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
- final StateStoreContext
context) {
- if (segments.containsKey(segmentId)) {
- return segments.get(segmentId);
- } else {
- if (segmentId < 0) {
- throw new IllegalArgumentException(
- "Negative segment IDs are reserved for reserved segments, "
- + "and should be created through
createReservedSegment() instead");
- }
-
- final LogicalKeyValueSegment newSegment = new
LogicalKeyValueSegment(segmentId, segmentName(segmentId), physicalStore);
-
- if (segments.put(segmentId, newSegment) != null) {
- throw new IllegalStateException("LogicalKeyValueSegment
already exists. Possible concurrent access.");
- }
-
- return newSegment;
+ protected LogicalKeyValueSegment createSegment(final long segmentId, final
String segmentName) {
+ if (segmentId < 0) {
+ throw new IllegalArgumentException(
+ "Negative segment IDs are reserved for reserved segments, "
+ + "and should be created through createReservedSegment()
instead");
}
+ return new LogicalKeyValueSegment(segmentId, segmentName,
physicalStore);
+ }
+
+ @Override
+ protected void openSegmentDB(final LogicalKeyValueSegment segment, final
StateStoreContext context) {
+ // no-op -- a logical segment is just a view on an underlying physical
store
}
LogicalKeyValueSegment createReservedSegment(final long segmentId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index ea3b89a525e..598b396f3fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -21,9 +21,16 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.io.IOException;
-public interface Segment extends KeyValueStore<Bytes, byte[]>,
BatchWritingStore {
+public interface Segment extends KeyValueStore<Bytes, byte[]>,
BatchWritingStore, Comparable<Segment> {
+
+ long id();
void destroy() throws IOException;
void deleteRange(Bytes keyFrom, Bytes keyTo);
+
+ @Override
+ default int compareTo(final Segment segment) {
+ return Long.compare(id(), segment.id());
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 3b813262d6f..957867d21ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -109,14 +109,6 @@ public interface SegmentedBytesStore extends StateStore {
*/
void remove(Bytes key);
- /**
- * Remove all duplicated records with the provided key in the specified
timestamp.
- *
- * @param key the segmented key to remove
- * @param timestamp the timestamp to match
- */
- void remove(Bytes key, long timestamp);
-
/**
* Write a new value to the store with the provided key. The key
* should be a composite of the record key, and the timestamp information
etc
@@ -159,18 +151,6 @@ public interface SegmentedBytesStore extends StateStore {
*/
Bytes lowerRange(final Bytes key, final long from);
- /**
- * Given a record key and a time, construct a Segmented key to search
when performing
- * prefixed queries.
- *
- * @param key
- * @param timestamp
- * @return The key that represents the prefixed Segmented key in
bytes.
- */
- default Bytes toStoreBinaryKeyPrefix(final Bytes key, final long
timestamp) {
- throw new UnsupportedOperationException();
- }
-
/**
* Given a range of fixed size record keys and a time, construct a
Segmented key that represents
* the upper range of keys to search when performing range queries.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 1bf07afab77..fab99b0c293 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -26,8 +26,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
-class TimestampedSegment extends RocksDBTimestampedStore implements
Comparable<TimestampedSegment>, Segment {
- public final long id;
+class TimestampedSegment extends RocksDBTimestampedStore implements Segment {
+ private final long id;
TimestampedSegment(final String segmentName,
final String windowName,
@@ -39,6 +39,11 @@ class TimestampedSegment extends RocksDBTimestampedStore
implements Comparable<T
this.position = position;
}
+ @Override
+ public long id() {
+ return id;
+ }
+
@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
@@ -49,15 +54,9 @@ class TimestampedSegment extends RocksDBTimestampedStore
implements Comparable<T
throw new UnsupportedOperationException();
}
- @Override
- public int compareTo(final TimestampedSegment segment) {
- return Long.compare(id, segment.id);
- }
-
@Override
public void openDB(final Map<String, Object> configs, final File stateDir)
{
super.openDB(configs, stateDir);
- // skip the registering step
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
index aeba24d30e4..5ecc13e8fc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
@@ -33,10 +33,8 @@ import java.util.Objects;
* header-aware storage with dual-column-family migration support from
* timestamp-only format to timestamp+headers format.
*/
-class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
- implements Comparable<TimestampedSegmentWithHeaders>, Segment {
-
- public final long id;
+class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
implements Segment {
+ private final long id;
TimestampedSegmentWithHeaders(final String segmentName,
final String windowName,
@@ -48,6 +46,11 @@ class TimestampedSegmentWithHeaders extends
RocksDBTimestampedStoreWithHeaders
this.position = position;
}
+ @Override
+ public long id() {
+ return id;
+ }
+
@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
@@ -58,15 +61,9 @@ class TimestampedSegmentWithHeaders extends
RocksDBTimestampedStoreWithHeaders
throw new UnsupportedOperationException();
}
- @Override
- public int compareTo(final TimestampedSegmentWithHeaders segment) {
- return Long.compare(id, segment.id);
- }
-
@Override
public void openDB(final Map<String, Object> configs, final File stateDir)
{
super.openDB(configs, stateDir);
- // skip the registering step
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 70fae503060..f63b3f64d59 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -36,30 +36,13 @@ class TimestampedSegments extends
AbstractSegments<TimestampedSegment> {
}
@Override
- public TimestampedSegment getOrCreateSegment(final long segmentId,
- final StateStoreContext
context) {
- if (segments.containsKey(segmentId)) {
- return segments.get(segmentId);
- } else {
- final TimestampedSegment newSegment =
- new TimestampedSegment(segmentName(segmentId), name,
segmentId, position, metricsRecorder);
-
- if (segments.put(segmentId, newSegment) != null) {
- throw new IllegalStateException("TimestampedSegment already
exists. Possible concurrent access.");
- }
-
- newSegment.openDB(context.appConfigs(), context.stateDir());
- return newSegment;
- }
+ protected TimestampedSegment createSegment(final long segmentId, final
String segmentName) {
+ return new TimestampedSegment(segmentName, name, segmentId, position,
metricsRecorder);
}
@Override
- public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
- final StateStoreContext
context,
- final long streamTime) {
- final TimestampedSegment segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
- cleanupExpiredSegments(streamTime);
- return segment;
+ protected void openSegmentDB(final TimestampedSegment segment, final
StateStoreContext context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
index 4e770cc24bd..9b6433d041e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -36,30 +36,13 @@ class TimestampedSegmentsWithHeaders extends
AbstractSegments<TimestampedSegment
}
@Override
- public TimestampedSegmentWithHeaders getOrCreateSegment(final long
segmentId,
- final
StateStoreContext context) {
- if (segments.containsKey(segmentId)) {
- return segments.get(segmentId);
- } else {
- final TimestampedSegmentWithHeaders newSegment =
- new TimestampedSegmentWithHeaders(segmentName(segmentId),
name, segmentId, position, metricsRecorder);
-
- if (segments.put(segmentId, newSegment) != null) {
- throw new IllegalStateException("TimestampedSegmentWithHeaders
already exists. Possible concurrent access.");
- }
-
- newSegment.openDB(context.appConfigs(), context.stateDir());
- return newSegment;
- }
+ protected TimestampedSegmentWithHeaders createSegment(final long
segmentId, final String segmentName) {
+ return new TimestampedSegmentWithHeaders(segmentName, name, segmentId,
position, metricsRecorder);
}
@Override
- public TimestampedSegmentWithHeaders getOrCreateSegmentIfLive(final long
segmentId,
- final
StateStoreContext context,
- final long
streamTime) {
- final TimestampedSegmentWithHeaders segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
- cleanupExpiredSegments(streamTime);
- return segment;
+ protected void openSegmentDB(final TimestampedSegmentWithHeaders segment,
final StateStoreContext context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 9e083ebbaf2..50da7b5d9f2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -191,9 +191,9 @@ public class KeyValueSegmentsTest {
final List<KeyValueSegment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL, true);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(2).id);
+ assertEquals(0, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(2).id());
}
@Test
@@ -211,9 +211,9 @@ public class KeyValueSegmentsTest {
final List<KeyValueSegment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL, false);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(2).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(0).id);
+ assertEquals(0, segments.get(2).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(0).id());
}
@Test
@@ -226,9 +226,9 @@ public class KeyValueSegmentsTest {
final List<KeyValueSegment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL, true);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(2).id);
+ assertEquals(0, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(2).id());
}
@Test
@@ -241,9 +241,9 @@ public class KeyValueSegmentsTest {
final List<KeyValueSegment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL, false);
assertEquals(3, segments.size());
- assertEquals(2, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(0, segments.get(2).id);
+ assertEquals(2, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(0, segments.get(2).id());
}
@Test
@@ -349,7 +349,7 @@ public class KeyValueSegmentsTest {
final List<KeyValueSegment> result = this.segments.segments(0,
Long.MAX_VALUE, true);
assertEquals(numSegments, result.size());
for (int i = 0; i < numSegments; i++) {
- assertEquals(i + first, result.get(i).id);
+ assertEquals(i + first, result.get(i).id());
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 63a70acb11c..ef9f5625d7a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -192,9 +192,9 @@ public class TimestampedSegmentsTest {
final List<TimestampedSegment> segments = this.segments.segments(0, 2
* SEGMENT_INTERVAL, true);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(2).id);
+ assertEquals(0, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(2).id());
}
@Test
@@ -212,9 +212,9 @@ public class TimestampedSegmentsTest {
final List<TimestampedSegment> segments = this.segments.segments(0, 2
* SEGMENT_INTERVAL, false);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(2).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(0).id);
+ assertEquals(0, segments.get(2).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(0).id());
}
@Test
@@ -227,9 +227,9 @@ public class TimestampedSegmentsTest {
final List<TimestampedSegment> segments = this.segments.segments(0, 2
* SEGMENT_INTERVAL, true);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(2).id);
+ assertEquals(0, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(2).id());
}
@Test
@@ -242,9 +242,9 @@ public class TimestampedSegmentsTest {
final List<TimestampedSegment> segments = this.segments.segments(0, 2
* SEGMENT_INTERVAL, false);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(2).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(0).id);
+ assertEquals(0, segments.get(2).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(0).id());
}
@Test
@@ -350,7 +350,7 @@ public class TimestampedSegmentsTest {
final List<TimestampedSegment> result = this.segments.segments(0,
Long.MAX_VALUE, true);
assertEquals(numSegments, result.size());
for (int i = 0; i < numSegments; i++) {
- assertEquals(i + first, result.get(i).id);
+ assertEquals(i + first, result.get(i).id());
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
index 1f580b3a601..b3651cf7b25 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -177,9 +177,9 @@ public class TimestampedSegmentsWithHeadersTest {
final List<TimestampedSegmentWithHeaders> segments =
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
assertEquals(3, segments.size());
- assertEquals(0, segments.get(0).id);
- assertEquals(1, segments.get(1).id);
- assertEquals(2, segments.get(2).id);
+ assertEquals(0, segments.get(0).id());
+ assertEquals(1, segments.get(1).id());
+ assertEquals(2, segments.get(2).id());
}
@Test