This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3619d2f MINOR: cleanup deprectaion annotations (#6290) 3619d2f is described below commit 3619d2f383f65108dfd33686119f675aaeea54b7 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Mar 8 12:31:34 2019 -0800 MINOR: cleanup deprectaion annotations (#6290) If deprecated interface methods are inherited, the @Deprication tag should be used (instead on suppressing the deprecation warning). Reviewers: Guozhang Wang <wangg...@gmail.com>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../apache/kafka/streams/kstream/JoinWindows.java | 14 +---- .../kafka/streams/kstream/SessionWindows.java | 2 - .../apache/kafka/streams/kstream/TimeWindows.java | 14 ++--- .../kafka/streams/kstream/UnlimitedWindows.java | 9 +-- .../org/apache/kafka/streams/kstream/Windows.java | 3 +- .../streams/kstream/internals/SessionWindow.java | 2 - .../internals/AbstractProcessorContext.java | 4 -- .../ForwardingDisabledProcessorContext.java | 6 +- .../internals/GlobalProcessorContextImpl.java | 4 +- .../internals/InternalProcessorContext.java | 3 + .../processor/internals/ProcessorContextImpl.java | 36 ++++++------ .../processor/internals/StandbyContextImpl.java | 8 +-- .../org/apache/kafka/streams/state/Stores.java | 14 ++--- .../apache/kafka/streams/state/WindowStore.java | 15 +++-- .../state/internals/CachingWindowStore.java | 6 +- .../internals/ChangeLoggingWindowBytesStore.java | 6 +- .../internals/CompositeReadOnlyWindowStore.java | 66 ++++++++++++++-------- .../state/internals/MeteredWindowStore.java | 6 +- .../state/internals/RocksDBWindowStore.java | 6 +- .../internals/AbstractProcessorContextTest.java | 20 ++++--- .../ForwardingDisabledProcessorContextTest.java | 2 + .../internals/GlobalProcessorContextImplTest.java | 2 + .../processor/internals/ProcessorTopologyTest.java | 4 +- .../internals/RecordDeserializerTest.java | 3 +- .../kafka/test/InternalMockProcessorContext.java | 14 ++--- .../kafka/test/MockInternalProcessorContext.java | 8 +-- .../apache/kafka/test/NoOpProcessorContext.java | 22 ++++---- .../streams/processor/MockProcessorContext.java | 16 ++++-- 28 files changed, 165 insertions(+), 150 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 219489f..6331877 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -92,7 +92,7 @@ public final class JoinWindows extends Windows<Window> { this.maintainDurationMs = maintainDurationMs; } - @SuppressWarnings("deprecation") // removing segments from Windows will fix this + @Deprecated // removing segments from Windows will fix this private JoinWindows(final long beforeMs, final long afterMs, final long graceMs, @@ -131,7 +131,6 @@ public final class JoinWindows extends Windows<Window> { * @param timeDifference join window interval * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); return of(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix)); @@ -148,7 +147,6 @@ public final class JoinWindows extends Windows<Window> { * @throws IllegalArgumentException if the resulting window size is negative * @deprecated Use {@link #before(Duration)} instead. */ - @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Deprecated public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException { return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments); @@ -164,7 +162,6 @@ public final class JoinWindows extends Windows<Window> { * @param timeDifference relative window start time * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") // removing segments from Windows will fix this public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); return before(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix)); @@ -181,7 +178,6 @@ public final class JoinWindows extends Windows<Window> { * @throws IllegalArgumentException if the resulting window size is negative * @deprecated Use {@link #after(Duration)} instead */ - @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Deprecated public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException { return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments); @@ -197,7 +193,6 @@ public final class JoinWindows extends Windows<Window> { * @param timeDifference relative window end time * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") // removing segments from Windows will fix this public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); return after(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix)); @@ -239,7 +234,6 @@ public final class JoinWindows extends Windows<Window> { return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments); } - @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode @Override public long gracePeriodMs() { // NOTE: in the future, when we remove maintainMs, @@ -254,7 +248,6 @@ public final class JoinWindows extends Windows<Window> { * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead. */ - @SuppressWarnings("deprecation") @Override @Deprecated public JoinWindows until(final long durationMs) throws IllegalArgumentException { @@ -272,14 +265,13 @@ public final class JoinWindows extends Windows<Window> { * @return the window maintain duration * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}. */ - @SuppressWarnings({"deprecation", "deprecatedMemberStillInUse"}) @Override @Deprecated public long maintainMs() { return Math.max(maintainDurationMs, size()); } - @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public boolean equals(final Object o) { if (this == o) { @@ -296,7 +288,7 @@ public final class JoinWindows extends Windows<Window> { graceMs == that.graceMs; } - @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public int hashCode() { return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index 9c77fa5..c0153a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -108,7 +108,6 @@ public final class SessionWindows { * * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") public static SessionWindows with(final Duration inactivityGap) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap"); return with(ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix)); @@ -163,7 +162,6 @@ public final class SessionWindows { @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode public long gracePeriodMs() { - // NOTE: in the future, when we remove maintainMs, // we should default the grace period to 24h to maintain the default behavior, // or we can default to (24h - gapMs) if you want to be super accurate. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 03203f0..a87dbf3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -79,7 +79,6 @@ public final class TimeWindows extends Windows<TimeWindow> { } /** Private constructor for preserving segments. Can be removed along with Windows.segments. **/ - @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated private TimeWindows(final long sizeMs, final long advanceMs, @@ -127,7 +126,7 @@ public final class TimeWindows extends Windows<TimeWindow> { * @return a new window definition with default maintain duration of 1 day * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing #of(final long sizeMs) will fix this public static TimeWindows of(final Duration size) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size"); return of(ApiUtils.validateMillisecondDuration(size, msgPrefix)); @@ -145,7 +144,6 @@ public final class TimeWindows extends Windows<TimeWindow> { * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size * @deprecated Use {@link #advanceBy(Duration)} instead */ - @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows @Deprecated public TimeWindows advanceBy(final long advanceMs) { if (advanceMs <= 0 || advanceMs > sizeMs) { @@ -166,7 +164,7 @@ public final class TimeWindows extends Windows<TimeWindow> { * @return a new window definition with default maintain duration of 1 day * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size */ - @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows + @SuppressWarnings("deprecation") // removing #advanceBy(final long advanceMs) will fix this public TimeWindows advanceBy(final Duration advance) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance"); return advanceBy(ApiUtils.validateMillisecondDuration(advance, msgPrefix)); @@ -227,7 +225,6 @@ public final class TimeWindows extends Windows<TimeWindow> { * @deprecated since 2.1. Use {@link Materialized#retention} or directly configure the retention in a store supplier * and use {@link Materialized#as(WindowBytesStoreSupplier)}. */ - @SuppressWarnings("deprecation") @Override @Deprecated public TimeWindows until(final long durationMs) throws IllegalArgumentException { @@ -245,14 +242,13 @@ public final class TimeWindows extends Windows<TimeWindow> { * @return the window maintain duration * @deprecated since 2.1. Use {@link Materialized#retention} instead. */ - @SuppressWarnings({"DeprecatedIsStillUsed", "deprecation"}) @Override @Deprecated public long maintainMs() { return Math.max(maintainDurationMs, sizeMs); } - @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public boolean equals(final Object o) { if (this == o) { @@ -269,13 +265,13 @@ public final class TimeWindows extends Windows<TimeWindow> { graceMs == that.graceMs; } - @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public int hashCode() { return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs); } - @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public String toString() { return "TimeWindows{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index e1894ba..f8ec6ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -84,7 +84,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { * @return a new unlimited window that starts at {@code start} * @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds} */ - @SuppressWarnings("deprecation") public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start"); return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix)); @@ -120,7 +119,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { * @throws IllegalArgumentException on every invocation. * @deprecated since 2.1. */ - @SuppressWarnings("deprecation") @Override @Deprecated public UnlimitedWindows until(final long durationMs) { @@ -134,7 +132,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { * @return the window retention time that is {@link Long#MAX_VALUE} * @deprecated since 2.1. Use {@link Materialized#retention} instead. */ - @SuppressWarnings("deprecation") @Override @Deprecated public long maintainMs() { @@ -146,7 +143,7 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { return 0L; } - @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public boolean equals(final Object o) { if (this == o) { @@ -159,13 +156,13 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> { return startMs == that.startMs && segments == that.segments; } - @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public int hashCode() { return Objects.hash(startMs, segments); } - @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this + @SuppressWarnings("deprecation") // removing segments from Windows will fix this @Override public String toString() { return "UnlimitedWindows{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index feaee1e..e122b4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -46,7 +46,7 @@ public abstract class Windows<W extends Window> { protected Windows() {} - @SuppressWarnings("deprecation") // remove this constructor when we remove segments. + @Deprecated // remove this constructor when we remove segments. Windows(final int segments) { this.segments = segments; } @@ -77,7 +77,6 @@ public abstract class Windows<W extends Window> { * @return the window maintain duration * @deprecated since 2.1. Use {@link Materialized#retention} instead. */ - @SuppressWarnings("DeprecatedIsStillUsed") @Deprecated public long maintainMs() { return maintainDurationMs; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java index 8111cdf..3057e32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.kstream.Window; /** @@ -29,7 +28,6 @@ import org.apache.kafka.streams.kstream.Window; * @see org.apache.kafka.streams.kstream.SessionWindows * @see org.apache.kafka.streams.processor.TimestampExtractor */ -@InterfaceStability.Unstable public final class SessionWindow extends Window { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index af8b073..ef1799b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -127,7 +127,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte if (recordContext == null) { throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); } - return recordContext.partition(); } @@ -139,7 +138,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte if (recordContext == null) { throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed"); } - return recordContext.offset(); } @@ -148,7 +146,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte if (recordContext == null) { throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed"); } - return recordContext.headers(); } @@ -160,7 +157,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte if (recordContext == null) { throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed"); } - return recordContext.timestamp(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java index 0ef70b7..ba39368 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.time.Duration; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; @@ -31,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import java.io.File; +import java.time.Duration; import java.util.Map; import java.util.Objects; @@ -110,14 +110,14 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex throw new StreamsException("ProcessorContext#forward() not supported."); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { throw new StreamsException("ProcessorContext#forward() not supported."); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { throw new StreamsException("ProcessorContext#forward() not supported."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 900cc71..0693ef7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -88,8 +88,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { /** * @throws UnsupportedOperationException on every invocation */ - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } @@ -97,8 +97,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { /** * @throws UnsupportedOperationException on every invocation */ - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 0f67dff..2a1d05e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -47,6 +47,9 @@ public interface InternalProcessorContext extends ProcessorContext { */ void setCurrentNode(ProcessorNode currentNode); + /** + * Get the current {@link ProcessorNode} + */ ProcessorNode currentNode(); /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 764d50c..2afd5e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -128,8 +128,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re forward(key, value, SEND_TO_ALL); } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { @@ -139,8 +140,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name())); } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { @@ -192,16 +194,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override @Deprecated - public Cancellable schedule(final long interval, + public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) { - if (interval < 1) { + if (intervalMs < 1) { throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond."); } - return task.schedule(interval, type, callback); + return task.schedule(intervalMs, type, callback); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this @Override public Cancellable schedule(final Duration interval, final PunctuationType type, @@ -315,16 +317,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return wrapped().fetch(key, time); } - @Deprecated @Override + @Deprecated public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { return wrapped().fetch(key, timeFrom, timeTo); } - @Deprecated @Override + @Deprecated public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, @@ -337,8 +339,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return wrapped().all(); } - @Deprecated @Override + @Deprecated public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { return wrapped().fetchAll(timeFrom, timeTo); @@ -505,7 +507,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return wrapped().fetch(key, time); } - @Deprecated + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public WindowStoreIterator<V> fetch(final K key, final long timeFrom, @@ -513,7 +515,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return wrapped().fetch(key, timeFrom, timeTo); } - @Deprecated + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, @@ -522,17 +524,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return wrapped().fetch(from, to, timeFrom, timeTo); } - @Override - public KeyValueIterator<Windowed<K>, V> all() { - return wrapped().all(); - } - - @Deprecated + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { return wrapped().fetchAll(timeFrom, timeTo); } + + @Override + public KeyValueIterator<Windowed<K>, V> all() { + return wrapped().all(); + } } private static class TimestampedWindowStoreReadWriteDecorator<K, V> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index ee69373..49dc5f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.time.Duration; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; @@ -33,6 +32,7 @@ import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; +import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -161,8 +161,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle /** * @throws UnsupportedOperationException on every invocation */ - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); } @@ -170,8 +170,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle /** * @throws UnsupportedOperationException on every invocation */ - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); } @@ -188,7 +188,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle * @throws UnsupportedOperationException on every invocation */ @Override - @SuppressWarnings("deprecation") + @Deprecated public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 113e531..ac2a023 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -196,7 +196,7 @@ public class Stores { * @return an instance of {@link WindowBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ - @Deprecated + @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode public static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriod, final int numSegments, @@ -271,21 +271,21 @@ public class Stores { /** * Create a persistent {@link SessionBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length ot time to retain data in the store (cannot be negative) + * @param retentionPeriodMs length ot time to retain data in the store (cannot be negative) * Note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead */ - @Deprecated + @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode public static SessionBytesStoreSupplier persistentSessionStore(final String name, - final long retentionPeriod) { + final long retentionPeriodMs) { Objects.requireNonNull(name, "name cannot be null"); - if (retentionPeriod < 0) { + if (retentionPeriodMs < 0) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); } - return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod); + return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs); } /** @@ -297,7 +297,7 @@ public class Stores { * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} */ - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing #persistentSessionStore(String name, long retentionPeriodMs) will fix this public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index f7eb37e..83a0ee1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -91,11 +91,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException if the given key is {@code null} */ - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Override - default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) { + default WindowStoreIterator<V> fetch(final K key, + final Instant from, + final Instant to) { return fetch( key, ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")), @@ -115,11 +117,14 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException if one of the given keys is {@code null} */ - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Override - default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) { + default KeyValueIterator<Windowed<K>, V> fetch(final K from, + final K to, + final Instant fromTime, + final Instant toTime) { return fetch( from, to, @@ -135,7 +140,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>} * @throws InvalidStateStoreException if the store is not initialized */ - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0a869da..0edd8f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -165,7 +165,7 @@ class CachingWindowStore } } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, @@ -190,7 +190,7 @@ class CachingWindowStore return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, @@ -221,7 +221,7 @@ class CachingWindowStore ); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index ef5a4c7..c58e9f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -65,7 +65,7 @@ class ChangeLoggingWindowBytesStore return wrapped().fetch(key, timestamp); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, @@ -73,7 +73,7 @@ class ChangeLoggingWindowBytesStore return wrapped().fetch(key, from, to); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, @@ -87,7 +87,7 @@ class ChangeLoggingWindowBytesStore return wrapped().all(); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 0908085..fbfc7a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -69,7 +69,9 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K @Override @Deprecated - public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { + public WindowStoreIterator<V> fetch(final K key, + final long timeFrom, + final long timeTo) { Objects.requireNonNull(key, "key can't be null"); final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType); for (final ReadOnlyWindowStore<K, V> windowStore : stores) { @@ -89,29 +91,39 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K return KeyValueIterators.emptyWindowStoreIterator(); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing fetch(K from, long from, long to) will fix this @Override - public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { + public WindowStoreIterator<V> fetch(final K key, + final Instant from, + final Instant to) throws IllegalArgumentException { return fetch( key, ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")), ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"))); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing fetch(K from, K to, long from, long to) will fix this @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { + public KeyValueIterator<Windowed<K>, V> fetch(final K from, + final K to, + final long timeFrom, + final long timeTo) { Objects.requireNonNull(from, "from can't be null"); Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to, timeFrom, timeTo); - return new DelegatingPeekingKeyValueIterator<>(storeName, - new CompositeKeyValueIterator<>( - provider.stores(storeName, windowStoreType).iterator(), - nextIteratorFunction)); + final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = + store -> store.fetch(from, to, timeFrom, timeTo); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>( + provider.stores(storeName, windowStoreType).iterator(), + nextIteratorFunction)); } @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { + public KeyValueIterator<Windowed<K>, V> fetch(final K from, + final K to, + final Instant fromTime, + final Instant toTime) throws IllegalArgumentException { return fetch( from, to, @@ -121,26 +133,32 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K @Override public KeyValueIterator<Windowed<K>, V> all() { - final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = ReadOnlyWindowStore::all; - return new DelegatingPeekingKeyValueIterator<>(storeName, - new CompositeKeyValueIterator<>( - provider.stores(storeName, windowStoreType).iterator(), - nextIteratorFunction)); + final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = + ReadOnlyWindowStore::all; + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>( + provider.stores(storeName, windowStoreType).iterator(), + nextIteratorFunction)); } @Override @Deprecated - public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { - final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetchAll(timeFrom, timeTo); - return new DelegatingPeekingKeyValueIterator<>(storeName, - new CompositeKeyValueIterator<>( - provider.stores(storeName, windowStoreType).iterator(), - nextIteratorFunction)); + public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, + final long timeTo) { + final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = + store -> store.fetchAll(timeFrom, timeTo); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>( + provider.stores(storeName, windowStoreType).iterator(), + nextIteratorFunction)); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing fetchAll(long from, long to) will fix this @Override - public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { + public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, + final Instant to) throws IllegalArgumentException { return fetchAll( ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")), ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"))); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 681b210..6d2eaab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -159,7 +159,7 @@ public class MeteredWindowStore<K, V> } } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public WindowStoreIterator<V> fetch(final K key, final long timeFrom, @@ -171,7 +171,7 @@ public class MeteredWindowStore<K, V> time); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, @@ -185,7 +185,7 @@ public class MeteredWindowStore<K, V> time); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index e621290..3b634eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -69,14 +69,14 @@ public class RocksDBWindowStore return bytesValue; } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo); return new WindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator(); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, @@ -92,7 +92,7 @@ public class RocksDBWindowStore return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator(); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 8afd302..1548e7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.time.Duration; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -36,6 +35,7 @@ import org.apache.kafka.test.MockKeyValueStore; import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.Properties; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; @@ -171,12 +171,16 @@ public class AbstractProcessorContextTest { @SuppressWarnings("unchecked") @Test public void appConfigsShouldReturnParsedValues() { - assertThat((Class<RocksDBConfigSetter>) context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG), equalTo(RocksDBConfigSetter.class)); + assertThat( + context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG), + equalTo(RocksDBConfigSetter.class)); } @Test public void appConfigsShouldReturnUnrecognizedValues() { - assertThat((String) context.appConfigs().get("user.supplied.config"), equalTo("user-suppplied-value")); + assertThat( + context.appConfigs().get("user.supplied.config"), + equalTo("user-suppplied-value")); } @@ -198,9 +202,11 @@ public class AbstractProcessorContextTest { return null; } - @SuppressWarnings("deprecation") @Override - public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { + @Deprecated + public Cancellable schedule(final long interval, + final PunctuationType type, + final Punctuator callback) { return null; } @@ -217,12 +223,12 @@ public class AbstractProcessorContextTest { @Override public <K, V> void forward(final K key, final V value, final To to) {} - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) {} - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) {} @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java index 03e79b7..c6b2cbe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java @@ -47,11 +47,13 @@ public class ForwardingDisabledProcessorContextTest { context.forward("key", "value", To.all()); } + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Test(expected = StreamsException.class) public void shouldThrowOnForwardWithChildIndex() { context.forward("key", "value", 1); } + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Test(expected = StreamsException.class) public void shouldThrowOnForwardWithChildName() { context.forward("key", "value", "child1"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index deb14e9..4153cca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -106,11 +106,13 @@ public class GlobalProcessorContextImplTest { globalContext.forward(null, null, To.all()); } + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Test(expected = UnsupportedOperationException.class) public void shouldNotSupportForwardingViaChildIndex() { globalContext.forward(null, null, 0); } + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Test(expected = UnsupportedOperationException.class) public void shouldNotSupportForwardingViaChildName() { globalContext.forward(null, null, "processorName"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 6a7bd02..1e3fad3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -611,7 +611,7 @@ public class ProcessorTopologyTest { this.numChildren = numChildren; } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Override public void process(final String key, final String value) { for (int i = 0; i != numChildren; ++i) { @@ -631,7 +631,7 @@ public class ProcessorTopologyTest { this.numChildren = numChildren; } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // need to test deprecated code until removed @Override public void process(final String key, final String value) { for (int i = 0; i != numChildren; ++i) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 2f2587c..3b38b7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -45,7 +45,6 @@ public class RecordDeserializerTest { new byte[0], headers); - @SuppressWarnings("deprecation") @Test public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { @@ -81,7 +80,7 @@ public class RecordDeserializerTest { final boolean valueThrowsException, final Object key, final Object value) { - super("", Collections.<String>emptyList(), null, null); + super("", Collections.emptyList(), null, null); this.keyThrowsException = keyThrowsException; this.valueThrowsException = valueThrowsException; this.key = key; diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index c9255ce..4b92679 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -159,7 +159,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple this.valSerde = valSerde; } - // serdes will override whatever specified in the configs @Override public Serde<?> keySerde() { return keySerde; @@ -179,7 +178,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple if (stateDir == null) { throw new UnsupportedOperationException("State directory not specified"); } - return stateDir; } @@ -195,8 +193,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple return storeMap.get(name); } - @SuppressWarnings("deprecation") @Override + @Deprecated public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { throw new UnsupportedOperationException("schedule() not supported."); } @@ -209,22 +207,24 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple } @Override - public void commit() { } + public void commit() {} - @Override @SuppressWarnings("unchecked") + @Override public <K, V> void forward(final K key, final V value) { forward(key, value, To.all()); } + @SuppressWarnings("unchecked") @Override - @SuppressWarnings({"unchecked", "deprecation"}) + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name())); } + @SuppressWarnings("unchecked") @Override - @SuppressWarnings({"unchecked", "deprecation"}) + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { forward(key, value, To.child(childName)); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 62a8491..5ae97c9 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -63,12 +63,8 @@ public class MockInternalProcessorContext extends MockProcessorContext implement } @Override - public void initialize() { - - } + public void initialize() {} @Override - public void uninitialize() { - - } + public void uninitialize() {} } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index c7c8343..77dd418 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.test; -import java.time.Duration; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Cancellable; @@ -29,19 +28,21 @@ import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class NoOpProcessorContext extends AbstractProcessorContext { public boolean initialized; + @SuppressWarnings("WeakerAccess") public Map<Object, Object> forwardedValues = new HashMap<>(); public NoOpProcessorContext() { super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null); } - static StreamsConfig streamsConfig() { + private static StreamsConfig streamsConfig() { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "boot"); @@ -53,9 +54,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext { return null; } - @SuppressWarnings("deprecation") @Override - public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { + @Deprecated + public Cancellable schedule(final long interval, + final PunctuationType type, + final Punctuator callback) { return null; } @@ -76,21 +79,20 @@ public class NoOpProcessorContext extends AbstractProcessorContext { forwardedValues.put(key, value); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { forward(key, value); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { forward(key, value); } @Override - public void commit() { - } + public void commit() {} @Override public void initialize() { @@ -99,7 +101,5 @@ public class NoOpProcessorContext extends AbstractProcessorContext { @Override public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallback) { - // no-op - } + final StateRestoreCallback stateRestoreCallback) {} } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 7b4c58b..34a7ed9 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -275,7 +275,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S * @param timestamp A record timestamp */ @SuppressWarnings({"WeakerAccess", "unused"}) - public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) { + public void setRecordMetadata(final String topic, + final int partition, + final long offset, + final Headers headers, + final long timestamp) { this.topic = topic; this.partition = partition; this.offset = offset; @@ -390,7 +394,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S @Override @Deprecated - public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) { + public Cancellable schedule(final long intervalMs, + final PunctuationType type, + final Punctuator callback) { final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback); punctuators.add(capturedPunctuator); @@ -398,7 +404,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S return capturedPunctuator::cancel; } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this @Override public Cancellable schedule(final Duration interval, final PunctuationType type, @@ -433,8 +439,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S ); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { throw new UnsupportedOperationException( "Forwarding to a child by index is deprecated. " + @@ -442,8 +448,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S ); } - @SuppressWarnings("deprecation") @Override + @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { throw new UnsupportedOperationException( "Forwarding to a child by name is deprecated. " +