vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669947367



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a 
Kafka record:
+            the key, value, timestamp and headers. When forwarding records, 
you can use the
+            constructor to create a new <code class="docutils literal"><span 
class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to 
replace one of the
+            <code class="docutils literal"><span 
class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span 
class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span 
class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils 
literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils 
literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called 
"punctuation")
+              in your processor's <code class="docutils literal"><span 
class="pre">init()</span></code>

Review comment:
       Hmm, that actually does sounds really useful. I never thought of it 
before. I'll file a ticket to document this use case. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to