mjsax commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1875212818
########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { Review Comment: Maybe better `OffsetResetStrategy` ? ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { Review Comment: ```suggestion private AutoOffsetReset(final OffsetResetType type, final Optional<Long> duration) { ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". Review Comment: ```suggestion * Creates an {@code AutoOffsetReset} instance representing "latest". ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. Review Comment: ```suggestion * Creates an {@code AutoOffsetReset} instance for the specified reset duration. ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. + * @return an AutoOffsetReset instance with the specified duration. + * @throws IllegalArgumentException if the duration is negative. + */ + public static AutoOffsetReset byDuration(final Duration duration) { + if (duration.isNegative()) { + throw new IllegalArgumentException("Duration cannot be negative"); + } + return new AutoOffsetReset(OffsetResetType.BY_DURATION, Optional.of(duration.toMillis())); + } + + /** + * Provides a human-readable description of the offset reset type and duration. + * + * @return a string describing the offset reset configuration. + */ + public String describe() { + switch (type) { + case LATEST: + return "Offset: Latest"; + case EARLIEST: + return "Offset: Earliest"; + case BY_DURATION: + return "Offset by duration: " + duration.map(d -> d + "ms").orElse("Invalid duration"); + default: + throw new IllegalStateException("Unexpected type: " + type); + } + } + + @Override + public boolean equals(Object o) { Review Comment: ```suggestion public boolean equals(final Object o) { ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -137,7 +161,7 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra } /** - * Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}. + * Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.AutoOffsetReset AutoOffsetReset}. Review Comment: Seem this change is not correct? We still take the old/deprecated enum ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". Review Comment: ```suggestion * Creates an {@code AutoOffsetReset} instance representing "earliest". ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -55,9 +56,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { protected Serde<K> keySerde; protected Serde<V> valueSerde; protected TimestampExtractor timestampExtractor; - protected Topology.AutoOffsetReset resetPolicy; + protected AutoOffsetReset resetPolicy; + @Deprecated + protected Topology.AutoOffsetReset resetPolicy; // Replaced with new AutoOffsetReset class introduced in 4.0. protected String processorName; + @Deprecated Review Comment: No need to `@Deprecate`a `private` constructor -- it's not user facing (also c.f my comment below -- we should only have once constructor. ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -70,6 +74,18 @@ private Consumed(final Serde<K> keySerde, this.processorName = processorName; } + private Consumed(final Serde<K> keySerde, Review Comment: We should not add a second constructor, but extend the existing one to take an additional parameter for the new AutoOffsetReset class ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. + * @return an AutoOffsetReset instance with the specified duration. Review Comment: ```suggestion * @return An {@code AutoOffsetReset} instance with the specified duration. ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. + * @return an AutoOffsetReset instance with the specified duration. + * @throws IllegalArgumentException if the duration is negative. + */ + public static AutoOffsetReset byDuration(final Duration duration) { + if (duration.isNegative()) { + throw new IllegalArgumentException("Duration cannot be negative"); + } + return new AutoOffsetReset(OffsetResetType.BY_DURATION, Optional.of(duration.toMillis())); + } + + /** + * Provides a human-readable description of the offset reset type and duration. + * + * @return a string describing the offset reset configuration. + */ + public String describe() { Review Comment: I don't think we need anything like this -- also, if we would want to add it, it would need to be part of the KIP. I would just remove it and not alter the KIP. If there is demand in the future, we can still do a follow up KIP an add it. But my gut feeling is, we won't need it, and/or could just add an overwrite for `toString()`. ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. Review Comment: ```suggestion * @param duration The duration to use for the offset reset; must be non-negative. ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. + * @return an AutoOffsetReset instance with the specified duration. + * @throws IllegalArgumentException if the duration is negative. + */ + public static AutoOffsetReset byDuration(final Duration duration) { + if (duration.isNegative()) { + throw new IllegalArgumentException("Duration cannot be negative"); + } + return new AutoOffsetReset(OffsetResetType.BY_DURATION, Optional.of(duration.toMillis())); + } + + /** + * Provides a human-readable description of the offset reset type and duration. + * + * @return a string describing the offset reset configuration. + */ + public String describe() { + switch (type) { + case LATEST: + return "Offset: Latest"; + case EARLIEST: + return "Offset: Earliest"; + case BY_DURATION: + return "Offset by duration: " + duration.map(d -> d + "ms").orElse("Invalid duration"); + default: + throw new IllegalStateException("Unexpected type: " + type); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AutoOffsetReset that = (AutoOffsetReset) o; Review Comment: ```suggestion final AutoOffsetReset that = (AutoOffsetReset) o; ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(OffsetResetType.EARLIEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance with a custom duration. + * + * @param duration the duration to use for the offset reset; must be non-negative. + * @return an AutoOffsetReset instance with the specified duration. + * @throws IllegalArgumentException if the duration is negative. Review Comment: ```suggestion * @throws IllegalArgumentException If the duration is negative. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -96,13 +112,21 @@ protected Consumed(final Consumed<K, V> consumed) { * * @return a new instance of {@link Consumed} */ + @Deprecated public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset resetPolicy) { return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); } + public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, Review Comment: Missing JavaDocs -- you can basically c&p the one from the existing `with()` method that we overload. ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. Review Comment: ```suggestion * @return An {@code AutoOffsetReset} instance for the "latest" offset. ``` ########## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + + private enum OffsetResetType { + LATEST, + EARLIEST, + BY_DURATION + } + + private final OffsetResetType type; + private final Optional<Long> duration; + + private AutoOffsetReset(OffsetResetType type, Optional<Long> duration) { + this.type = type; + this.duration = duration; + } + + /** + * Creates an AutoOffsetReset instance representing "latest". + * + * @return an AutoOffsetReset instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(OffsetResetType.LATEST, Optional.empty()); + } + + /** + * Creates an AutoOffsetReset instance representing "earliest". + * + * @return an AutoOffsetReset instance for the "earliest" offset. Review Comment: ```suggestion * @return An {@link AutoOffsetReset} instance for the "earliest" offset. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -147,10 +171,15 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra * * @return a new instance of {@link Consumed} */ + @Deprecated public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) { return new Consumed<>(null, null, null, resetPolicy, null); Review Comment: add javadoc tag about deprecation ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -74,10 +74,172 @@ protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} * or {@link KTable} via {@link StreamsBuilder}. */ + @Deprecated public enum AutoOffsetReset { EARLIEST, LATEST } + /** + * Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found; acceptable values: earliest or latest + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final AutoOffsetReset offsetReset, Review Comment: ```suggestion public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -96,13 +112,21 @@ protected Consumed(final Consumed<K, V> consumed) { * * @return a new instance of {@link Consumed} */ + @Deprecated Review Comment: Also need to add corresponding Java tag. ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -74,10 +74,172 @@ protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} * or {@link KTable} via {@link StreamsBuilder}. */ Review Comment: We also need to add a deprecation tag to the JavaDocs. ``` @deprecated Since 4.0. Use {@link org.apache.kafka.streams.AutoOffsetReset} instead. ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -55,9 +56,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { protected Serde<K> keySerde; protected Serde<V> valueSerde; protected TimestampExtractor timestampExtractor; - protected Topology.AutoOffsetReset resetPolicy; + protected AutoOffsetReset resetPolicy; Review Comment: We need to update `equals` and `hashCode` to consider this new member variable. ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -147,10 +171,15 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra * * @return a new instance of {@link Consumed} Review Comment: add javadoc deprecation tag ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -147,10 +171,15 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra * * @return a new instance of {@link Consumed} */ + @Deprecated public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) { return new Consumed<>(null, null, null, resetPolicy, null); } + public static <K, V> Consumed<K, V> with(final AutoOffsetReset resetPolicy) { Review Comment: missing javadocs ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -55,9 +56,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { protected Serde<K> keySerde; protected Serde<V> valueSerde; protected TimestampExtractor timestampExtractor; - protected Topology.AutoOffsetReset resetPolicy; + protected AutoOffsetReset resetPolicy; + @Deprecated + protected Topology.AutoOffsetReset resetPolicy; // Replaced with new AutoOffsetReset class introduced in 4.0. Review Comment: ```suggestion protected Topology.AutoOffsetReset legacyResetPolicy; ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -70,6 +74,18 @@ private Consumed(final Serde<K> keySerde, this.processorName = processorName; Review Comment: This constructor should just set both variable: ``` @Deprecated private Consumed(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset legacyResetPolicy, finalAutoOffsetReset resetPolicy, final String processorName) { this.keySerde = keySerde; this.valueSerde = valueSerde; this.timestampExtractor = timestampExtractor; this.legacyResetPolicy = legacyResetPolicy; this.resetPolicy = resetPolicy; this.processorName = processorName; } ``` This will help to simplify our internal code later. ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -74,10 +74,172 @@ protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} * or {@link KTable} via {@link StreamsBuilder}. */ + @Deprecated public enum AutoOffsetReset { EARLIEST, LATEST } + /** + * Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found; acceptable values: earliest or latest + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final AutoOffsetReset offsetReset, Review Comment: I believe we will need to use the fully qualified class name here -- otherwise, the name will be resolved to the exiting enum instead of the newly added class. -- Same below for the other newly added methods. ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -77,6 +77,166 @@ protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { public enum AutoOffsetReset { EARLIEST, LATEST } + /** + * Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found; acceptable values: earliest or latest + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final String... topics) { + internalTopologyBuilder.addSource(null, name, null, null, null, topics); Review Comment: I don't see an update yet ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -96,13 +112,21 @@ protected Consumed(final Consumed<K, V> consumed) { * * @return a new instance of {@link Consumed} */ + @Deprecated public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset resetPolicy) { return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); } + public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, + final Serde<V> valueSerde, + final TimestampExtractor timestampExtractor, + final AutoOffsetReset resetPolicy) { + return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); Review Comment: We can pass `legacyResetPolicy` as `null`. ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -96,13 +112,21 @@ protected Consumed(final Consumed<K, V> consumed) { * * @return a new instance of {@link Consumed} */ + @Deprecated public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset resetPolicy) { return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); Review Comment: We need to update this call to pass in one more paremeter: ``` return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, convertOldToNew(resetPolicy), null); ``` plus add a helper: ``` private static AutoOffsetReset convertOldToNew(final TopologyAutoOffsetReset resetPolicy) { if (resetPolicy == null) { return null; } return resetPolicy = EARLIEST ? AutoOffsetReset.earliest() : AutoOffsetReset.latest(); } ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java: ########## @@ -55,9 +56,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { protected Serde<K> keySerde; protected Serde<V> valueSerde; protected TimestampExtractor timestampExtractor; - protected Topology.AutoOffsetReset resetPolicy; + protected AutoOffsetReset resetPolicy; + @Deprecated + protected Topology.AutoOffsetReset resetPolicy; // Replaced with new AutoOffsetReset class introduced in 4.0. Review Comment: We cannot have two member variable `resetPolicy` -- lets rename the old one in a meaningful way -- this also allows us to avoid the comment. ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -77,6 +77,166 @@ protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { public enum AutoOffsetReset { Review Comment: The existing methods `addSource(...)` which take this enum as parameter, should also get the `@Deprecated` annotation, and corresponding `@deprecated` JavaDocs tag (linking to the newly added methods, which take the new `AutoOffsetReset` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
