Repository: kafka
Updated Branches:
  refs/heads/1.0 dc6bfa553 -> dc2efd5c3


MINOR: a few web doc and javadoc fixes

1. Added missing Javadocs in public interfaces.
2. Added missing upgrade web docs.
3. Minor improvements on exception messages.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Bill Bejeck <b...@confluent.io>, Damian Guy <damian....@gmail.com>, 
Matthias J. Sax <matth...@confluent.io>, Antony Stubbs <antony.stu...@gmail.com>

Closes #4071 from guozhangwang/KMinor-javadoc-gaps

(cherry picked from commit ef4914520019e941827dac8eda6000a82cb74cc5)
Signed-off-by: Guozhang Wang <wangg...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc2efd5c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc2efd5c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc2efd5c

Branch: refs/heads/1.0
Commit: dc2efd5c3b9ff341c732c64ec33dfd6ce60c8a17
Parents: dc6bfa5
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Mon Oct 16 16:50:59 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Oct 16 16:51:08 2017 -0700

----------------------------------------------------------------------
 docs/streams/upgrade-guide.html                 | 22 ++++++++++++++------
 .../org/apache/kafka/streams/KafkaStreams.java  |  9 +++++---
 .../kafka/streams/processor/Cancellable.java    |  9 +++++++-
 .../kafka/streams/processor/Punctuator.java     |  8 ++++++-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  6 +++---
 5 files changed, 40 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b7bf19a..2974058 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -103,21 +103,31 @@
     </ul>
 
     <p>
-        Deprecated methods in <code>KafkaStreams</code>:
+        Deprecated / modified methods in <code>KafkaStreams</code>:
     </p>
     <ul>
-        <li><code>toString()</code>, <code>toString(final String 
indent)</code> were previously used to return static and runtime information.
+        <li>
+            <code>toString()</code>, <code>toString(final String 
indent)</code> were previously used to return static and runtime information.
             They have been deprecated in favor of using the new 
classes/methods <code>#localThreadsMetadata()</code> / 
<code>ThreadMetadata</code> (returning runtime information) and
             <code>TopologyDescription</code> / 
<code>Topology#describe()</code> (returning static information).
         </li>
-        <li>With the introduction of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines";>KIP-182</a>
+        <li>
+            With the introduction of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines";>KIP-182</a>
             you should no longer pass in <code>Serde</code> to 
<code>KStream#print</code> operations.
             If you can't rely on using <code>toString</code> to print your 
keys an values, you should instead you provide a custom 
<code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> 
call.
         </li>
         <li>
-            Windowed aggregations have moved from <code>KGroupedStream</code> 
to <code>WindowedKStream</code>.
+            <code>setStateListener()</code> now can only be set before the 
application start running, i.e. before <code>KafkaStreams.start()</code> is 
called.
+        </li>
+    </ul>
+
+    <p>
+        Deprecated methods in <code>KGroupedStream</code>
+    </p>
+    <ul>
+        <li>
+            Windowed aggregations have been deprecated from 
<code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
             You can now perform a windowed aggregation by, for example, using 
<code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
-            Note: the previous aggregate functions on 
<code>KGroupedStream</code> still work, but have been deprecated.
         </li>
     </ul>
 
@@ -216,7 +226,7 @@
         <li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to 
iterate over the keys of a <code>KTable</code>. </li>
     </ul>
     <p>
-        If you want to view the changelog stream of the  <code>KTable</code> 
then you could call <code>KTable.toStream().print()</code>.
+        If you want to view the changelog stream of the <code>KTable</code> 
then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
     </p>
 
     <p> Metrics using exactly-once semantics: </p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f5eb0a0..ae4ef34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -336,7 +336,8 @@ public class KafkaStreams {
         if (state == State.CREATED) {
             stateListener = listener;
         } else {
-            throw new IllegalStateException("Can only set StateListener in 
CREATED state.");
+            throw new IllegalStateException("Can only set StateListener in 
CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -357,7 +358,8 @@ public class KafkaStreams {
                 globalStreamThread.setUncaughtExceptionHandler(eh);
             }
         } else {
-            throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state.");
+            throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -372,7 +374,8 @@ public class KafkaStreams {
         if (state == State.CREATED) {
             this.globalStateRestoreListener = globalStateRestoreListener;
         } else {
-            throw new IllegalStateException("Can only set the 
GlobalRestoreListener in the CREATED state");
+            throw new IllegalStateException("Can only set 
GlobalStateRestoreListener in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
index 82c9edd..2e56b56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
@@ -16,8 +16,15 @@
  */
 package org.apache.kafka.streams.processor;
 
+/**
+ * Cancellable interface returned in {@link ProcessorContext#schedule(long, 
PunctuationType, Punctuator)}.
+ *
+ * @see Punctuator
+ */
 public interface Cancellable {
 
+    /**
+     * Cancel the scheduled operation to avoid future calls.
+     */
     void cancel();
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index 200c1af..407270f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -18,9 +18,15 @@ package org.apache.kafka.streams.processor;
 
 /**
  * A functional interface used as an argument to {@link 
ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ *
+ * @see Cancellable
  */
 public interface Punctuator {
 
+    /**
+     * Perform the scheduled periodic operation.
+     *
+     * @param timestamp when the operation is being called, depending on 
{@link PunctuationType}
+     */
     void punctuate(long timestamp);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc2efd5c/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 4bd2890..69b4584 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -266,7 +266,7 @@ public class KafkaStreamsTest {
             streams.setGlobalStateRestoreListener(new 
MockStateRestoreListener());
             fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set the GlobalRestoreListener in the 
CREATED state", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }
@@ -279,7 +279,7 @@ public class KafkaStreamsTest {
             streams.setUncaughtExceptionHandler(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set UncaughtExceptionHandler in 
CREATED state.", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }
@@ -292,7 +292,7 @@ public class KafkaStreamsTest {
             streams.setStateListener(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set StateListener in CREATED 
state.", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }

Reply via email to