vvcephei commented on a change in pull request #10994: URL: https://github.com/apache/kafka/pull/10994#discussion_r669947367
########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="# <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single <code class="docutils literal"><span class="pre">Processor</span></code> object by calling <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> - <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: - (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp. - (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). - Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p> - <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code> - API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used + <p> + The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters: + <code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types + that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and + <code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed + to <code class="docutils literal"><span class="pre">process()</span></code>. + Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code> + define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> + will accept. If your processor does not forward any records at all (or if it only forwards + <code class="docutils literal"><span class="pre">null</span></code> keys or values), + a best practice is to set the output generic type argument to + <code class="docutils literal"><span class="pre">Void</span></code>. + If it needs to forward multiple types that don't share a common superclass, you will + have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>. + </p> + <p> + Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code> + and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code> + methods handle precords in the form of the <code class="docutils literal"><span class="pre">Record<K, V></span></code> + data class. This class gives you access to the key components of a Kafka record: + the key, value, timestamp and headers. When forwarding records, you can use the + constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code> + from scratch, or you can use the convenience builder methods to replace one of the + <code class="docutils literal"><span class="pre">Record</span></code>'s properties + and copy over the rest. For example, + <code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code> + would copy the key, timestamp, and headers from + <code class="docutils literal"><span class="pre">inputRecord</span></code> while + setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>. + Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>, + but instead creates a shallow copy. Beware that this is only a shallow copy, so if you + plan to mutate the key, value, or headers elsewhere in the program, you will want to + create a deep copy of those fields yourself. + </p> + <p> + In addition to handling incoming records via + <code class="docutils literal"><span class="pre">Processor#process()</span></code>, + you have the option to schedule periodic invocation (called "punctuation") + in your processor's <code class="docutils literal"><span class="pre">init()</span></code> Review comment: Hmm, that actually does sounds really useful. I never thought of it before. I'll file a ticket to document this use case. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org