mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1866949109
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "latest".
+ *
+ * @return an AutoOffsetReset instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(Optional.empty());
Review Comment:
Using `empty` and `0L` might be rather subtle to encode "latest" and
"earliest".
I might be easier to follow the code, if we add an (non-public and nested)
enum with three value LATEST, EARLIEST, BY_DURATION to be more explicit. (I
think package-private should do the trick? `private` might not work down the
road, but we could also start with `private` and open it up only if necessary
in a follow up PR)
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "latest".
+ *
+ * @return an AutoOffsetReset instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(Optional.empty());
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "earliest".
+ *
+ * @return an AutoOffsetReset instance for the "earliest" offset.
+ */
+ public static AutoOffsetReset earliest() {
+ return new AutoOffsetReset(Optional.of(0L));
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance with a custom duration.
+ *
+ * @param duration the duration to use for the offset reset; must be
non-negative.
+ * @return an AutoOffsetReset instance with the specified duration.
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ public static AutoOffsetReset duration(Duration duration) {
+ if (duration.isNegative()) {
+ throw new IllegalArgumentException("Duration cannot be negative");
+ }
+ return new AutoOffsetReset(Optional.of(duration.toMillis()));
+ }
+
+ /**
+ * Retrieves the offset reset duration if specified.
+ *
+ * @return an Optional containing the duration in milliseconds, or empty
if "latest".
+ */
+ public Optional<Long> getDuration() {
+ return duration;
+ }
+
+ @Override
+ public String toString() {
Review Comment:
We don't need to overwrite `toString()`.
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
Review Comment:
```suggestion
protected AutoOffsetReset(final Optional<Long> duration) {
```
Inside the Kafka Streams code base, we use `final` whenever possible.
Applies to the whole PR. Please add `final` to all parameters, and immutable
(class member) variables.
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "latest".
+ *
+ * @return an AutoOffsetReset instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(Optional.empty());
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "earliest".
+ *
+ * @return an AutoOffsetReset instance for the "earliest" offset.
+ */
+ public static AutoOffsetReset earliest() {
+ return new AutoOffsetReset(Optional.of(0L));
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance with a custom duration.
+ *
+ * @param duration the duration to use for the offset reset; must be
non-negative.
+ * @return an AutoOffsetReset instance with the specified duration.
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ public static AutoOffsetReset duration(Duration duration) {
+ if (duration.isNegative()) {
+ throw new IllegalArgumentException("Duration cannot be negative");
+ }
+ return new AutoOffsetReset(Optional.of(duration.toMillis()));
+ }
+
+ /**
+ * Retrieves the offset reset duration if specified.
+ *
+ * @return an Optional containing the duration in milliseconds, or empty
if "latest".
+ */
+ public Optional<Long> getDuration() {
Review Comment:
This method should not be on this class, but we would add an
`AutoOffsetResetInternal` class inside the `internal` package and add it there
(cf the KIP discussion (or `GroupedInternal` which extends `Grouped` as an
example:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java)
##########
streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java:
##########
@@ -99,8 +99,8 @@ protected Consumed(final Consumed<K, V> consumed) {
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor
timestampExtractor,
- final Topology.AutoOffsetReset
resetPolicy) {
Review Comment:
We cannot just change this method, but need to keep it as-is, but only mark
as `@Deprecated` and add a new overload with take the new `AutoOffsetReset`
class instead of the old enum.
(Internally, we can translate the old enum to the new class right away...)
Same below.
(This comment does only apply to this class, for `ConsumedInternal` the
change is ok, as it's an internal class and we can just change it right away.)
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -77,6 +77,166 @@ protected Topology(final InternalTopologyBuilder
internalTopologyBuilder) {
public enum AutoOffsetReset {
Review Comment:
We should mark this enum as `@Deprecated` plus all the methods which use it
(or did you not do this on purpose yet, but want to do it a follow up PR?)
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "latest".
+ *
+ * @return an AutoOffsetReset instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(Optional.empty());
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "earliest".
+ *
+ * @return an AutoOffsetReset instance for the "earliest" offset.
+ */
+ public static AutoOffsetReset earliest() {
+ return new AutoOffsetReset(Optional.of(0L));
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance with a custom duration.
+ *
+ * @param duration the duration to use for the offset reset; must be
non-negative.
+ * @return an AutoOffsetReset instance with the specified duration.
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ public static AutoOffsetReset duration(Duration duration) {
+ if (duration.isNegative()) {
+ throw new IllegalArgumentException("Duration cannot be negative");
+ }
+ return new AutoOffsetReset(Optional.of(duration.toMillis()));
+ }
+
+ /**
+ * Retrieves the offset reset duration if specified.
+ *
+ * @return an Optional containing the duration in milliseconds, or empty
if "latest".
+ */
+ public Optional<Long> getDuration() {
+ return duration;
+ }
+
+ @Override
+ public String toString() {
+ return duration.map(d -> "Duration: " + d + "ms").orElse("Latest");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
Review Comment:
Code style: inside Kafka Streams module, we _always_ use `{...}` for blocks.
This should be:
```
if (this == o) {
return true;
}
```
Please also apply elsewhere.
##########
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##########
@@ -0,0 +1,76 @@
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+ protected final Optional<Long> duration;
+
+ protected AutoOffsetReset(Optional<Long> duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "latest".
+ *
+ * @return an AutoOffsetReset instance for the "latest" offset.
+ */
+ public static AutoOffsetReset latest() {
+ return new AutoOffsetReset(Optional.empty());
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance representing "earliest".
+ *
+ * @return an AutoOffsetReset instance for the "earliest" offset.
+ */
+ public static AutoOffsetReset earliest() {
+ return new AutoOffsetReset(Optional.of(0L));
+ }
+
+ /**
+ * Creates an AutoOffsetReset instance with a custom duration.
+ *
+ * @param duration the duration to use for the offset reset; must be
non-negative.
+ * @return an AutoOffsetReset instance with the specified duration.
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ public static AutoOffsetReset duration(Duration duration) {
Review Comment:
I think we should rename this to `byDuration()` to align to the consumer
offset naming "by_duration" ?
\cc @omkreddy (if we agree on this, the KIP would need to be updated.
##########
streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.AutoOffsetReset;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class AutoOffsetResetTest {
+
+ @Test
+ void testLatest() {
+ AutoOffsetReset latest = AutoOffsetReset.latest();
+ assertTrue(latest.getDuration().isEmpty(), "Latest should have an
empty duration.");
+ assertEquals("Latest", latest.toString(), "toString() should return
'Latest' for latest offset.");
+ }
+
+ @Test
+ void testEarliest() {
+ AutoOffsetReset earliest = AutoOffsetReset.earliest();
+ assertEquals(Optional.of(0L), earliest.getDuration(), "Earliest should
have a duration of 0ms.");
+ assertEquals("Duration: 0ms", earliest.toString(), "toString() should
return 'Duration: 0ms' for earliest offset.");
+ }
+
+ @Test
+ void testCustomDuration() {
+ Duration duration = Duration.ofSeconds(10);
+ AutoOffsetReset custom = AutoOffsetReset.duration(duration);
+ assertEquals(Optional.of(10000L), custom.getDuration(), "Duration
should match the specified value in milliseconds.");
+ assertEquals("Duration: 10000ms", custom.toString(), "toString()
should display the correct duration.");
+ }
+
+ @Test
+ void testNegativeDurationThrowsException() {
Review Comment:
```suggestion
void shouldThrowExceptionIfDurationIsNegative() {
```
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -77,6 +77,166 @@ protected Topology(final InternalTopologyBuilder
internalTopologyBuilder) {
public enum AutoOffsetReset {
EARLIEST, LATEST
}
+ /**
Review Comment:
nit: missing empty line
##########
streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.AutoOffsetReset;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class AutoOffsetResetTest {
+
+ @Test
+ void testLatest() {
Review Comment:
This is a bad test name... In Kafka Streams, we use very long and explicit
test name, and `shouldDoXWhenY()` or similar.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]