Repository: kafka Updated Branches: refs/heads/1.0 dc6bfa553 -> dc2efd5c3
MINOR: a few web doc and javadoc fixes 1. Added missing Javadocs in public interfaces. 2. Added missing upgrade web docs. 3. Minor improvements on exception messages. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Bill Bejeck <b...@confluent.io>, Damian Guy <damian....@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Antony Stubbs <antony.stu...@gmail.com> Closes #4071 from guozhangwang/KMinor-javadoc-gaps (cherry picked from commit ef4914520019e941827dac8eda6000a82cb74cc5) Signed-off-by: Guozhang Wang <wangg...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc2efd5c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc2efd5c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc2efd5c Branch: refs/heads/1.0 Commit: dc2efd5c3b9ff341c732c64ec33dfd6ce60c8a17 Parents: dc6bfa5 Author: Guozhang Wang <wangg...@gmail.com> Authored: Mon Oct 16 16:50:59 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Oct 16 16:51:08 2017 -0700 ---------------------------------------------------------------------- docs/streams/upgrade-guide.html | 22 ++++++++++++++------ .../org/apache/kafka/streams/KafkaStreams.java | 9 +++++--- .../kafka/streams/processor/Cancellable.java | 9 +++++++- .../kafka/streams/processor/Punctuator.java | 8 ++++++- .../apache/kafka/streams/KafkaStreamsTest.java | 6 +++--- 5 files changed, 40 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/docs/streams/upgrade-guide.html ---------------------------------------------------------------------- diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b7bf19a..2974058 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -103,21 +103,31 @@ </ul> <p> - Deprecated methods in <code>KafkaStreams</code>: + Deprecated / modified methods in <code>KafkaStreams</code>: </p> <ul> - <li><code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information. + <li> + <code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information. They have been deprecated in favor of using the new classes/methods <code>#localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and <code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information). </li> - <li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a> + <li> + With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a> you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations. If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call. </li> <li> - Windowed aggregations have moved from <code>KGroupedStream</code> to <code>WindowedKStream</code>. + <code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called. + </li> + </ul> + + <p> + Deprecated methods in <code>KGroupedStream</code> + </p> + <ul> + <li> + Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>. You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>. - Note: the previous aggregate functions on <code>KGroupedStream</code> still work, but have been deprecated. </li> </ul> @@ -216,7 +226,7 @@ <li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li> </ul> <p> - If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print()</code>. + If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>. </p> <p> Metrics using exactly-once semantics: </p> http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f5eb0a0..ae4ef34 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -336,7 +336,8 @@ public class KafkaStreams { if (state == State.CREATED) { stateListener = listener; } else { - throw new IllegalStateException("Can only set StateListener in CREATED state."); + throw new IllegalStateException("Can only set StateListener in CREATED state. " + + "Current state is: " + state); } } @@ -357,7 +358,8 @@ public class KafkaStreams { globalStreamThread.setUncaughtExceptionHandler(eh); } } else { - throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state."); + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + "Current state is: " + state); } } @@ -372,7 +374,8 @@ public class KafkaStreams { if (state == State.CREATED) { this.globalStateRestoreListener = globalStateRestoreListener; } else { - throw new IllegalStateException("Can only set the GlobalRestoreListener in the CREATED state"); + throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " + + "Current state is: " + state); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java index 82c9edd..2e56b56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java @@ -16,8 +16,15 @@ */ package org.apache.kafka.streams.processor; +/** + * Cancellable interface returned in {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}. + * + * @see Punctuator + */ public interface Cancellable { + /** + * Cancel the scheduled operation to avoid future calls. + */ void cancel(); - } http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java index 200c1af..407270f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java @@ -18,9 +18,15 @@ package org.apache.kafka.streams.processor; /** * A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}. + * + * @see Cancellable */ public interface Punctuator { + /** + * Perform the scheduled periodic operation. + * + * @param timestamp when the operation is being called, depending on {@link PunctuationType} + */ void punctuate(long timestamp); - } http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 4bd2890..69b4584 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -266,7 +266,7 @@ public class KafkaStreamsTest { streams.setGlobalStateRestoreListener(new MockStateRestoreListener()); fail("Should throw an IllegalStateException"); } catch (final IllegalStateException e) { - Assert.assertEquals("Can only set the GlobalRestoreListener in the CREATED state", e.getMessage()); + // expected } finally { streams.close(); } @@ -279,7 +279,7 @@ public class KafkaStreamsTest { streams.setUncaughtExceptionHandler(null); fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage()); + // expected } finally { streams.close(); } @@ -292,7 +292,7 @@ public class KafkaStreamsTest { streams.setStateListener(null); fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage()); + // expected } finally { streams.close(); }