mjsax commented on code in PR #18314:
URL: https://github.com/apache/kafka/pull/18314#discussion_r1913737465
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -64,7 +64,7 @@
</li>
</ul>
</li>
- <li><a class="reference internal"
href="#applying-processors-and-transformers-processor-api-integration"
id="id24">Applying processors and transformers (Processor API
integration)</a></li>
+ <li><a class="reference internal"
href="#migrating-from-transform-methods-to-processor-api-papi"
id="id37">Migrating from transform Methods to Processor API (PAPI)</a></li>
Review Comment:
Would it be better to update the old section to `Applying processors
(Processor API integration)` and add a new section `Migrating from transform
Methods to Processor API (PAPI)` ?
We would eventually remove the "migration section" (and maybe even back-port
to 3.9/3.8 docs, with some adjustments), but keep the updated "PAPI
integration".
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
Review Comment:
Why is the indention change?
The last line before is
```
</div>
```
But is one is only
```
<div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
```
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-methods-to-processor-api-papi"
+ title="Permalink to this headline">
+ Migrating from transform Methods to Processor API (PAPI)
+ </a>
+ </h2>
+ <h3>Overview of Changes</h3>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka Streams
API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>, and
<code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the more
versatile Processor API. This
+ guide provides detailed steps for migrating existing code to use
the new Processor API and
+ explains the benefits of the changes.
+ </p>
+ <p>The following deprecated methods are no longer available in Kafka
Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The Processor API now serves as a unified replacement for all these
methods. It simplifies the
+ API surface while maintaining support for both stateless and
stateful operations.</p>
+ <h3>Migration Process</h3>
+ <p>The migration process consists of:</p>
+ <ol>
+ <li>
+ Replace <code>Transformer</code> with <code>Processor</code>
or <code>ValueTransformer</code> with
+ <code>FixedKeyProcessor</code>;
+ </li>
+ <li>
+ Replace record <code>key</code> and <code>value</code> with
<code>Record</code> or <code>FixedKeyRecord</code>;
+ </li>
+ <li>
+ Rewrite the <code>transform</code> method of
<code>Transformer</code> and <code>ValueTransformer</code> as
+ <code>process</code> or <code>processValues</code>;
+ </li>
+ <li>
+ Use the new <code>Record</code> or <code>FixedKeyRecord</code>
as argument of the renamed method;</li>
+ <li>
+ Rewrite the return type of the renamed method to
<code>void</code> and forward the record through the context;
+ and finally
+ </li>
+ <li>
+ Change the <code>KStream</code> call of the
<code>transform</code> method to <code>process</code> or
+ <code>processValues</code>.
+ </li>
+ </ol>
+ <h3>Migration Examples</h3>
+ <p>
+ To migrate from the deprecated <code>transform</code>,
<code>transformValues</code>, <code>flatTransform</code>, and
+ <code>flatTransformValues</code> methods to the Process API (PAPI)
in Kafka Streams, follow these examples. The new
+ <code>process</code> and <code>processValues</code> APIs enable a
more flexible and reusable approach by requiring
+ implementations of the <code>Processor</code> or
<code>FixedKeyProcessor</code> interfaces.
+ </p>
+ <p>Here are examples to help you migrate:</p>
+ <table>
+ <thead>
+ <tr>
+ <th>Example</th>
+ <th>Migrating from</th>
+ <th>Migrating to</th>
+ <th>State Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a
Loyalty Program</a></td>
+ <td><code>transform</code></td>
+ <td><code>process</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#categorizing-logs-by-severity">Categorizing Logs
by Severity</a></td>
+ <td><code>flatTransform</code></td>
+ <td><code>process</code></td>
+ <td>Stateless</td>
+ </tr>
+ <tr>
+ <td><a href="#traffic-radar-monitoring-car-count">Traffic
Radar Monitoring Car Count</a></td>
+ <td><code>transformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#replacing-slang-in-text-messages">Replacing
Slang in Text Messages</a></td>
+ <td><code>flatTransformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateless</td>
+ </tr>
+ </tbody>
+ </table>
+ <h4>Stateless Examples</h4>
+ <h5 id="categorizing-logs-by-severity">Categorizing Logs by
Severity</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> You have a stream of log messages. Each
message contains a severity level (e.g., INFO,
+ WARN, ERROR) in the value. The processor filters messages,
routing ERROR messages to a dedicated topic and
+ discarding INFO messages. The rest (WARN) are forwarded to
another processor.
Review Comment:
> The rest (WARN) are forwarded to another processor.
I don't see this in the example? Seem `WARN` is going into some other topic,
using a `TopicNameExtractor` ?
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-methods-to-processor-api-papi"
+ title="Permalink to this headline">
+ Migrating from transform Methods to Processor API (PAPI)
+ </a>
+ </h2>
+ <h3>Overview of Changes</h3>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka Streams
API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>, and
<code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the more
versatile Processor API. This
+ guide provides detailed steps for migrating existing code to use
the new Processor API and
+ explains the benefits of the changes.
+ </p>
+ <p>The following deprecated methods are no longer available in Kafka
Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The Processor API now serves as a unified replacement for all these
methods. It simplifies the
+ API surface while maintaining support for both stateless and
stateful operations.</p>
+ <h3>Migration Process</h3>
+ <p>The migration process consists of:</p>
+ <ol>
+ <li>
+ Replace <code>Transformer</code> with <code>Processor</code>
or <code>ValueTransformer</code> with
+ <code>FixedKeyProcessor</code>;
+ </li>
+ <li>
+ Replace record <code>key</code> and <code>value</code> with
<code>Record</code> or <code>FixedKeyRecord</code>;
+ </li>
+ <li>
+ Rewrite the <code>transform</code> method of
<code>Transformer</code> and <code>ValueTransformer</code> as
+ <code>process</code> or <code>processValues</code>;
+ </li>
+ <li>
+ Use the new <code>Record</code> or <code>FixedKeyRecord</code>
as argument of the renamed method;</li>
+ <li>
+ Rewrite the return type of the renamed method to
<code>void</code> and forward the record through the context;
+ and finally
+ </li>
+ <li>
+ Change the <code>KStream</code> call of the
<code>transform</code> method to <code>process</code> or
+ <code>processValues</code>.
+ </li>
+ </ol>
+ <h3>Migration Examples</h3>
+ <p>
+ To migrate from the deprecated <code>transform</code>,
<code>transformValues</code>, <code>flatTransform</code>, and
+ <code>flatTransformValues</code> methods to the Process API (PAPI)
in Kafka Streams, follow these examples. The new
+ <code>process</code> and <code>processValues</code> APIs enable a
more flexible and reusable approach by requiring
+ implementations of the <code>Processor</code> or
<code>FixedKeyProcessor</code> interfaces.
+ </p>
+ <p>Here are examples to help you migrate:</p>
+ <table>
+ <thead>
+ <tr>
+ <th>Example</th>
+ <th>Migrating from</th>
+ <th>Migrating to</th>
+ <th>State Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a
Loyalty Program</a></td>
+ <td><code>transform</code></td>
+ <td><code>process</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#categorizing-logs-by-severity">Categorizing Logs
by Severity</a></td>
+ <td><code>flatTransform</code></td>
+ <td><code>process</code></td>
+ <td>Stateless</td>
+ </tr>
+ <tr>
+ <td><a href="#traffic-radar-monitoring-car-count">Traffic
Radar Monitoring Car Count</a></td>
+ <td><code>transformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#replacing-slang-in-text-messages">Replacing
Slang in Text Messages</a></td>
+ <td><code>flatTransformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateless</td>
+ </tr>
+ </tbody>
+ </table>
+ <h4>Stateless Examples</h4>
+ <h5 id="categorizing-logs-by-severity">Categorizing Logs by
Severity</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> You have a stream of log messages. Each
message contains a severity level (e.g., INFO,
+ WARN, ERROR) in the value. The processor filters messages,
routing ERROR messages to a dedicated topic and
+ discarding INFO messages. The rest (WARN) are forwarded to
another processor.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In a production
monitoring system, categorizing logs by severity ensures
+ ERROR logs are sent to a critical incident management system,
WARN logs are analyzed for potential risks, and
+ INFO logs are stored for basic reporting purposes.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>categorizeWithFlatTransform</code> and
<code>categorizeWithProcess</code> show how you can
+ migrate from <code>flatTransform</code> to <code>process</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
- @Override
- public void init(ProcessorContext<Void, Void> context) {
- this.context = context;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CategorizingLogsBySeverityExample {
+ private static final String ERROR_LOGS_TOPIC =
"error-logs-topic";
+ private static final String INPUT_LOGS_TOPIC =
"input-logs-topic";
+ private static final String UNKNOWN_LOGS_TOPIC =
"unknown-logs-topic";
+ private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
+
+ public static void categorizeWithFlatTransform(final StreamsBuilder
builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.flatTransform(() -> new LogSeverityTransformer())
+ .to((key, value, recordContext) -> {
+ // Determine the target topic dynamically
+ if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
+ if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
+ return UNKNOWN_LOGS_TOPIC;
+ });
+ }
- // Here you would perform any additional initializations such as setting
up an email client.
- }
+ public static void categorizeWithProcess(final StreamsBuilder builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.process(LogSeverityProcessor::new);
Review Comment:
`to(...)` is missing
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-methods-to-processor-api-papi"
+ title="Permalink to this headline">
+ Migrating from transform Methods to Processor API (PAPI)
+ </a>
+ </h2>
+ <h3>Overview of Changes</h3>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka Streams
API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>, and
<code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the more
versatile Processor API. This
+ guide provides detailed steps for migrating existing code to use
the new Processor API and
+ explains the benefits of the changes.
+ </p>
+ <p>The following deprecated methods are no longer available in Kafka
Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The Processor API now serves as a unified replacement for all these
methods. It simplifies the
+ API surface while maintaining support for both stateless and
stateful operations.</p>
+ <h3>Migration Process</h3>
+ <p>The migration process consists of:</p>
+ <ol>
+ <li>
+ Replace <code>Transformer</code> with <code>Processor</code>
or <code>ValueTransformer</code> with
+ <code>FixedKeyProcessor</code>;
+ </li>
+ <li>
+ Replace record <code>key</code> and <code>value</code> with
<code>Record</code> or <code>FixedKeyRecord</code>;
+ </li>
+ <li>
+ Rewrite the <code>transform</code> method of
<code>Transformer</code> and <code>ValueTransformer</code> as
+ <code>process</code> or <code>processValues</code>;
+ </li>
+ <li>
+ Use the new <code>Record</code> or <code>FixedKeyRecord</code>
as argument of the renamed method;</li>
+ <li>
+ Rewrite the return type of the renamed method to
<code>void</code> and forward the record through the context;
+ and finally
+ </li>
+ <li>
+ Change the <code>KStream</code> call of the
<code>transform</code> method to <code>process</code> or
+ <code>processValues</code>.
+ </li>
+ </ol>
+ <h3>Migration Examples</h3>
+ <p>
+ To migrate from the deprecated <code>transform</code>,
<code>transformValues</code>, <code>flatTransform</code>, and
+ <code>flatTransformValues</code> methods to the Process API (PAPI)
in Kafka Streams, follow these examples. The new
+ <code>process</code> and <code>processValues</code> APIs enable a
more flexible and reusable approach by requiring
+ implementations of the <code>Processor</code> or
<code>FixedKeyProcessor</code> interfaces.
+ </p>
+ <p>Here are examples to help you migrate:</p>
+ <table>
+ <thead>
+ <tr>
+ <th>Example</th>
+ <th>Migrating from</th>
+ <th>Migrating to</th>
+ <th>State Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a
Loyalty Program</a></td>
+ <td><code>transform</code></td>
+ <td><code>process</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#categorizing-logs-by-severity">Categorizing Logs
by Severity</a></td>
+ <td><code>flatTransform</code></td>
+ <td><code>process</code></td>
+ <td>Stateless</td>
+ </tr>
+ <tr>
+ <td><a href="#traffic-radar-monitoring-car-count">Traffic
Radar Monitoring Car Count</a></td>
+ <td><code>transformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#replacing-slang-in-text-messages">Replacing
Slang in Text Messages</a></td>
+ <td><code>flatTransformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateless</td>
+ </tr>
+ </tbody>
+ </table>
+ <h4>Stateless Examples</h4>
+ <h5 id="categorizing-logs-by-severity">Categorizing Logs by
Severity</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> You have a stream of log messages. Each
message contains a severity level (e.g., INFO,
+ WARN, ERROR) in the value. The processor filters messages,
routing ERROR messages to a dedicated topic and
+ discarding INFO messages. The rest (WARN) are forwarded to
another processor.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In a production
monitoring system, categorizing logs by severity ensures
+ ERROR logs are sent to a critical incident management system,
WARN logs are analyzed for potential risks, and
+ INFO logs are stored for basic reporting purposes.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>categorizeWithFlatTransform</code> and
<code>categorizeWithProcess</code> show how you can
+ migrate from <code>flatTransform</code> to <code>process</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
- @Override
- public void init(ProcessorContext<Void, Void> context) {
- this.context = context;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CategorizingLogsBySeverityExample {
+ private static final String ERROR_LOGS_TOPIC =
"error-logs-topic";
+ private static final String INPUT_LOGS_TOPIC =
"input-logs-topic";
+ private static final String UNKNOWN_LOGS_TOPIC =
"unknown-logs-topic";
+ private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
+
+ public static void categorizeWithFlatTransform(final StreamsBuilder
builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.flatTransform(() -> new LogSeverityTransformer())
+ .to((key, value, recordContext) -> {
+ // Determine the target topic dynamically
+ if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
+ if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
+ return UNKNOWN_LOGS_TOPIC;
+ });
+ }
- // Here you would perform any additional initializations such as setting
up an email client.
- }
+ public static void categorizeWithProcess(final StreamsBuilder builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.process(LogSeverityProcessor::new);
+ }
- @Override
- void process(Record<PageId, Long> record) {
- // Here you would format and send the alert email.
- //
- // In this specific example, you would be able to include
- // information about the page's ID and its view count
- }
+ private static class LogSeverityTransformer implements
Transformer<String, String, Iterable<KeyValue<String,
String>>> {
+ @Override
+ public void init(org.apache.kafka.streams.processor.ProcessorContext
context) {
+ }
- @Override
- void close() {
- // Any code for clean up would go here, for example tearing down the email
client and anything
- // else you created in the init() method
- // This processor instance will not be used again after this call.
- }
+ @Override
+ public Iterable<KeyValue<String, String>> transform(String
key, String value) {
+ if (value == null) {
+ return Collections.emptyList(); // Skip null values
+ }
-}</code></pre>
- <div class="admonition tip">
- <p><b>Tip</b></p>
- <p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
- State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
- There are two ways to connect state stores to a
processor:
- <ul class="simple">
- <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
- <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
- passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
- beforehand, the store will be automatically added
for you. You can also implement <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the
- <code class="docutils literal"><span
class="pre">Value*</span></code> or <code class="docutils literal"><span
class="pre">*WithKey</span></code> supplier variants, or <code class="docutils
literal"><span class="pre">TransformerSupplier</span></code> or any of its
variants.
- </li>
- </ul>
- </div>
- <p>Then we can leverage the <code class="docutils
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the
DSL via <code class="docutils literal"><span
class="pre">KStream#process</span></code>.</p>
- <pre class="line-numbers"><code
class="language-java">KStream<String, GenericRecord> pageViews = ...;
-
-// Send an email notification when the view count of a page reaches one
thousand.
-pageViews.groupByKey()
- .count()
- .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
- // PopularPageEmailAlert is your custom processor that implements the
- // `Processor` interface, see further down below.
- .process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
- </div>
+ // Assume the severity is the first word in the log message
+ // For example: "ERROR: Disk not found" ->
"ERROR"
+ int colonIndex = value.indexOf(':');
+ String severity = colonIndex > 0 ? value.substring(0,
colonIndex).trim() : "UNKNOWN";
+
+ // Create appropriate KeyValue pair based on severity
+ return switch (severity) {
+ case "ERROR" -> List.of(new
KeyValue<>("ERROR", value));
+ case "WARN" -> List.of(new
KeyValue<>("WARN", value));
+ case "INFO" -> Collections.emptyList(); // INFO
logs are ignored
+ default -> List.of(new
KeyValue<>("UNKNOWN", value));
+ };
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static class LogSeverityProcessor implements Processor<String,
String, String, String> {
Review Comment:
We could use `ContextualProcessor` instead? Thoughts? (would also us to
simplify the code below as we would get the `context` set automatically.
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-methods-to-processor-api-papi"
+ title="Permalink to this headline">
+ Migrating from transform Methods to Processor API (PAPI)
+ </a>
+ </h2>
+ <h3>Overview of Changes</h3>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka Streams
API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>, and
<code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the more
versatile Processor API. This
+ guide provides detailed steps for migrating existing code to use
the new Processor API and
+ explains the benefits of the changes.
+ </p>
+ <p>The following deprecated methods are no longer available in Kafka
Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The Processor API now serves as a unified replacement for all these
methods. It simplifies the
+ API surface while maintaining support for both stateless and
stateful operations.</p>
+ <h3>Migration Process</h3>
+ <p>The migration process consists of:</p>
+ <ol>
+ <li>
+ Replace <code>Transformer</code> with <code>Processor</code>
or <code>ValueTransformer</code> with
+ <code>FixedKeyProcessor</code>;
+ </li>
+ <li>
+ Replace record <code>key</code> and <code>value</code> with
<code>Record</code> or <code>FixedKeyRecord</code>;
+ </li>
+ <li>
+ Rewrite the <code>transform</code> method of
<code>Transformer</code> and <code>ValueTransformer</code> as
+ <code>process</code> or <code>processValues</code>;
+ </li>
+ <li>
+ Use the new <code>Record</code> or <code>FixedKeyRecord</code>
as argument of the renamed method;</li>
+ <li>
+ Rewrite the return type of the renamed method to
<code>void</code> and forward the record through the context;
+ and finally
+ </li>
+ <li>
+ Change the <code>KStream</code> call of the
<code>transform</code> method to <code>process</code> or
+ <code>processValues</code>.
+ </li>
+ </ol>
+ <h3>Migration Examples</h3>
+ <p>
+ To migrate from the deprecated <code>transform</code>,
<code>transformValues</code>, <code>flatTransform</code>, and
+ <code>flatTransformValues</code> methods to the Process API (PAPI)
in Kafka Streams, follow these examples. The new
+ <code>process</code> and <code>processValues</code> APIs enable a
more flexible and reusable approach by requiring
+ implementations of the <code>Processor</code> or
<code>FixedKeyProcessor</code> interfaces.
+ </p>
+ <p>Here are examples to help you migrate:</p>
+ <table>
+ <thead>
+ <tr>
+ <th>Example</th>
+ <th>Migrating from</th>
+ <th>Migrating to</th>
+ <th>State Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a
Loyalty Program</a></td>
+ <td><code>transform</code></td>
+ <td><code>process</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#categorizing-logs-by-severity">Categorizing Logs
by Severity</a></td>
+ <td><code>flatTransform</code></td>
+ <td><code>process</code></td>
+ <td>Stateless</td>
+ </tr>
+ <tr>
+ <td><a href="#traffic-radar-monitoring-car-count">Traffic
Radar Monitoring Car Count</a></td>
+ <td><code>transformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#replacing-slang-in-text-messages">Replacing
Slang in Text Messages</a></td>
+ <td><code>flatTransformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateless</td>
+ </tr>
+ </tbody>
+ </table>
+ <h4>Stateless Examples</h4>
+ <h5 id="categorizing-logs-by-severity">Categorizing Logs by
Severity</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> You have a stream of log messages. Each
message contains a severity level (e.g., INFO,
+ WARN, ERROR) in the value. The processor filters messages,
routing ERROR messages to a dedicated topic and
+ discarding INFO messages. The rest (WARN) are forwarded to
another processor.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In a production
monitoring system, categorizing logs by severity ensures
+ ERROR logs are sent to a critical incident management system,
WARN logs are analyzed for potential risks, and
+ INFO logs are stored for basic reporting purposes.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>categorizeWithFlatTransform</code> and
<code>categorizeWithProcess</code> show how you can
+ migrate from <code>flatTransform</code> to <code>process</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
- @Override
- public void init(ProcessorContext<Void, Void> context) {
- this.context = context;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CategorizingLogsBySeverityExample {
+ private static final String ERROR_LOGS_TOPIC =
"error-logs-topic";
+ private static final String INPUT_LOGS_TOPIC =
"input-logs-topic";
+ private static final String UNKNOWN_LOGS_TOPIC =
"unknown-logs-topic";
+ private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
+
+ public static void categorizeWithFlatTransform(final StreamsBuilder
builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.flatTransform(() -> new LogSeverityTransformer())
+ .to((key, value, recordContext) -> {
+ // Determine the target topic dynamically
+ if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
+ if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
+ return UNKNOWN_LOGS_TOPIC;
+ });
+ }
- // Here you would perform any additional initializations such as setting
up an email client.
- }
+ public static void categorizeWithProcess(final StreamsBuilder builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.process(LogSeverityProcessor::new);
+ }
- @Override
- void process(Record<PageId, Long> record) {
- // Here you would format and send the alert email.
- //
- // In this specific example, you would be able to include
- // information about the page's ID and its view count
- }
+ private static class LogSeverityTransformer implements
Transformer<String, String, Iterable<KeyValue<String,
String>>> {
+ @Override
+ public void init(org.apache.kafka.streams.processor.ProcessorContext
context) {
+ }
- @Override
- void close() {
- // Any code for clean up would go here, for example tearing down the email
client and anything
- // else you created in the init() method
- // This processor instance will not be used again after this call.
- }
+ @Override
+ public Iterable<KeyValue<String, String>> transform(String
key, String value) {
+ if (value == null) {
+ return Collections.emptyList(); // Skip null values
+ }
-}</code></pre>
- <div class="admonition tip">
- <p><b>Tip</b></p>
- <p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
- State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
- There are two ways to connect state stores to a
processor:
- <ul class="simple">
- <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
- <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
- passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
- beforehand, the store will be automatically added
for you. You can also implement <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the
- <code class="docutils literal"><span
class="pre">Value*</span></code> or <code class="docutils literal"><span
class="pre">*WithKey</span></code> supplier variants, or <code class="docutils
literal"><span class="pre">TransformerSupplier</span></code> or any of its
variants.
- </li>
- </ul>
- </div>
- <p>Then we can leverage the <code class="docutils
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the
DSL via <code class="docutils literal"><span
class="pre">KStream#process</span></code>.</p>
- <pre class="line-numbers"><code
class="language-java">KStream<String, GenericRecord> pageViews = ...;
-
-// Send an email notification when the view count of a page reaches one
thousand.
-pageViews.groupByKey()
- .count()
- .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
- // PopularPageEmailAlert is your custom processor that implements the
- // `Processor` interface, see further down below.
- .process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
- </div>
+ // Assume the severity is the first word in the log message
+ // For example: "ERROR: Disk not found" ->
"ERROR"
+ int colonIndex = value.indexOf(':');
+ String severity = colonIndex > 0 ? value.substring(0,
colonIndex).trim() : "UNKNOWN";
+
+ // Create appropriate KeyValue pair based on severity
+ return switch (severity) {
+ case "ERROR" -> List.of(new
KeyValue<>("ERROR", value));
+ case "WARN" -> List.of(new
KeyValue<>("WARN", value));
+ case "INFO" -> Collections.emptyList(); // INFO
logs are ignored
+ default -> List.of(new
KeyValue<>("UNKNOWN", value));
+ };
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static class LogSeverityProcessor implements Processor<String,
String, String, String> {
+ private ProcessorContext<String, String> context;
+
+ @Override
+ public void init(final ProcessorContext<String, String> context)
{
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ if (record.value() == null) {
+ return; // Skip null values
+ }
+
+ // Assume the severity is the first word in the log message
+ // For example: "ERROR: Disk not found" ->
"ERROR"
+ final int colonIndex = record.value().indexOf(':');
+ final String severity = colonIndex > 0 ?
record.value().substring(0, colonIndex).trim() : "UNKNOWN";
+
+ // Route logs based on severity
+ switch (severity) {
+ case "ERROR":
+ context.forward(new Record<>(ERROR_LOGS_TOPIC,
record.value(), record.timestamp()));
+ break;
+ case "WARN":
+ context.forward(new Record<>(WARN_LOGS_TOPIC,
record.value(), record.timestamp()));
+ break;
+ case "INFO":
+ // INFO logs are ignored
+ break;
+ default:
+ // Forward to an "unknown" topic for logs with
unrecognized severities
+ context.forward(new Record<>(UNKNOWN_LOGS_TOPIC,
record.value(), record.timestamp()));
+ }
+ }
+ }
+}
+</code></pre>
+ <h5 id="replacing-slang-in-text-messages">Replacing Slang in Text
Messages</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> A messaging stream contains
user-generated content, and you want to replace slang words
+ with their formal equivalents (e.g., "u" becomes
"you", "brb" becomes "be
+ right back"). The operation only modifies the message
value and keeps the key intact.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In customer support chat
systems, normalizing text by replacing slang with
+ formal equivalents ensures that automated sentiment analysis
tools work accurately and provide reliable
+ insights.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>replaceWithFlatTransformValues</code> and
<code>replaceWithProcessValues</code> show how you
+ can migrate from <code>flatTransformValues</code> to
<code>processValues</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+public class ReplacingSlangTextInMessagesExample {
+ private static final Map<String, String> SLANG_DICTIONARY = Map.of(
+ "u", "you",
+ "brb", "be right back",
+ "omg", "oh my god",
+ "btw", "by the way"
+ );
+ private static final String INPUT_MESSAGES_TOPIC =
"input-messages-topic";
+ private static final String OUTPUT_MESSAGES_TOPIC =
"output-messages-topic";
+
+ public static void replaceWithFlatTransformValues(final StreamsBuilder
builder) {
+ KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
+
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
+ }
+
+ public static void replaceWithProcessValues(final StreamsBuilder builder) {
+ KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
+
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
+ }
+
+ private static class SlangReplacementTransformer implements
ValueTransformer<String, Iterable<String>> {
+ @Override
+ public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) {
+ }
+
+ @Override
+ public Iterable<String> transform(final String value) {
+ if (value == null) {
+ return Collections.emptyList(); // Skip null values
+ }
+
+ // Replace slang words in the message
+ final String[] words = value.split("\\s+");
+ return Arrays.asList(
+ Arrays.stream(words)
+ .map(word -> SLANG_DICTIONARY.getOrDefault(word, word))
+ .toArray(String[]::new)
Review Comment:
Why is this returning an `String[]` (if I read it correctly)? -- It should
return a `String` instead, so we would need a `reduce()` or something like
this? (Did you test test examples (do they compile)?
Or just use the same code as below:
```
final StringBuilder replacedMessage = new StringBuilder();
for (String word : words) {
replacedMessage.append(SLANG_DICTIONARY.getOrDefault(word,
word)).append(" ");
}
return replacedMessage.toString();
```
It's confusing if the actually business logic of replacing words is
different in both examples; it's better to have it aligned in both.
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3097,152 +3097,615 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
</div>
</div>
</div>
- <div class="section"
id="applying-processors-and-transformers-processor-api-integration">
- <span id="streams-developer-guide-dsl-process"></span><h3><a
class="toc-backref" href="#id24">Applying processors and transformers
(Processor API integration)</a><a class="headerlink"
href="#applying-processors-and-transformers-processor-api-integration"
title="Permalink to this headline"></a></h3>
- <p>Beyond the aforementioned <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateless</span></a> and
- <a class="reference internal"
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std
std-ref">stateful</span></a> transformations, you may also
- leverage the <a class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- There are a number of scenarios where this may be
helpful:</p>
- <ul class="simple">
- <li><strong>Customization:</strong> You need to implement
special, customized logic that is not or not yet available in the DSL.</li>
- <li><strong>Combining ease-of-use with full flexibility
where it’s needed:</strong> Even though you generally prefer to use
- the expressiveness of the DSL, there are certain steps
in your processing that require more flexibility and
- tinkering than the DSL provides. For example, only
the Processor API provides access to a
- record’s metadata such as its topic, partition,
and offset information.
- However, you don’t want to switch completely to
the Processor API just because of that.</li>
- <li><strong>Migrating from other tools:</strong> You are
migrating from other stream processing technologies that provide an
- imperative API, and migrating some of your legacy code
to the Processor API was faster and/or easier than to
- migrate completely to the DSL right away.</li>
- </ul>
- <table border="1" class="non-scrolling-table width-100-percent
docutils">
- <colgroup>
- <col width="19%" />
- <col width="81%" />
- </colgroup>
- <thead valign="bottom">
- <tr class="row-odd"><th class="head">Transformation</th>
- <th class="head">Description</th>
- </tr>
- </thead>
- <tbody valign="top">
- <tr class="row-even"><td><p
class="first"><strong>Process</strong></p>
- <ul class="last simple">
- <li>KStream -> void</li>
- </ul>
- </td>
- <td><p class="first"><strong>Terminal
operation.</strong> Applies a <code class="docutils literal"><span
class="pre">Processor</span></code> to each record.
- <code class="docutils literal"><span
class="pre">process()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p>
- <p>This is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Processor</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- <tr class="row-odd"><td><p
class="first"><strong>Transform</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">Transformer</span></code> to each record.
- <code class="docutils literal"><span
class="pre">transform()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into zero,
one, or more output records (similar to the stateless <code class="docutils
literal"><span class="pre">flatMap</span></code>).
- The <code class="docutils literal"><span
class="pre">Transformer</span></code> must return <code class="docutils
literal"><span class="pre">null</span></code> for zero output.
- You can modify the record’s key and
value, including their types.</p>
- <p><strong>Marks the stream for data
re-partitioning:</strong>
- Applying a grouping or a join after <code
class="docutils literal"><span class="pre">transform</span></code> will result
in re-partitioning of the records.
- If possible use <code class="docutils
literal"><span class="pre">transformValues</span></code> instead, which will
not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transform</span></code> is essentially equivalent to adding the
<code class="docutils literal"><span class="pre">Transformer</span></code> via
<code class="docutils literal"><span
class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>.
- </p>
- </td>
- </tr>
- <tr class="row-even"><td><p
class="first"><strong>Transform (values only)</strong></p>
- <ul class="last simple">
- <li>KStream -> KStream</li>
- <li>KTable -> KTable</li>
- </ul>
- </td>
- <td><p class="first">Applies a <code class="docutils
literal"><span class="pre">ValueTransformer</span></code> to each record, while
retaining the key of the original record.
- <code class="docutils literal"><span
class="pre">transformValues()</span></code> allows you to leverage the <a
class="reference internal"
href="processor-api.html#streams-developer-guide-processor-api"><span
class="std std-ref">Processor API</span></a> from the DSL.
- (<a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p>
- <p>Each input record is transformed into exactly
one output record (zero output records or multiple output records are not
possible).
- The <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> may return <code class="docutils
literal"><span class="pre">null</span></code> as the new value for a record.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is preferable to <code
class="docutils literal"><span class="pre">transform</span></code> because it
will not cause data re-partitioning.</p>
- <p><code class="docutils literal"><span
class="pre">transformValues</span></code> is essentially equivalent to adding
the <code class="docutils literal"><span
class="pre">ValueTransformer</span></code> via <code class="docutils
literal"><span class="pre">Topology#addProcessor()</span></code> to your
- <a class="reference internal"
href="../core-concepts.html#streams_topology"><span class="std
std-ref">processor topology</span></a>.</p>
- <p class="last">An example is available in the
- <a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p>
- </td>
- </tr>
- </tbody>
- </table>
- <p>The following example shows how to leverage, via the <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method, a custom <code class="docutils literal"><span
class="pre">Processor</span></code> that sends an
- email notification whenever a page view count reaches a
predefined threshold.</p>
- <p>First, we need to implement a custom stream processor,
<code class="docutils literal"><span
class="pre">PopularPageEmailAlert</span></code>, that implements the <code
class="docutils literal"><span class="pre">Processor</span></code>
- interface:</p>
- <pre class="line-numbers"><code class="language-java">// A
processor that sends an alert message about a popular page to a configurable
email address
-public class PopularPageEmailAlert implements Processor<PageId, Long, Void,
Void> {
-
- private final String emailAddress;
- private ProcessorContext<Void, Void> context;
-
- public PopularPageEmailAlert(String emailAddress) {
- this.emailAddress = emailAddress;
- }
+ <div class="section"
id="migrating-from-transform-methods-to-processor-api-papi">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-methods-to-processor-api-papi"
+ title="Permalink to this headline">
+ Migrating from transform Methods to Processor API (PAPI)
+ </a>
+ </h2>
+ <h3>Overview of Changes</h3>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka Streams
API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>, and
<code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the more
versatile Processor API. This
+ guide provides detailed steps for migrating existing code to use
the new Processor API and
+ explains the benefits of the changes.
+ </p>
+ <p>The following deprecated methods are no longer available in Kafka
Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The Processor API now serves as a unified replacement for all these
methods. It simplifies the
+ API surface while maintaining support for both stateless and
stateful operations.</p>
+ <h3>Migration Process</h3>
+ <p>The migration process consists of:</p>
+ <ol>
+ <li>
+ Replace <code>Transformer</code> with <code>Processor</code>
or <code>ValueTransformer</code> with
+ <code>FixedKeyProcessor</code>;
+ </li>
+ <li>
+ Replace record <code>key</code> and <code>value</code> with
<code>Record</code> or <code>FixedKeyRecord</code>;
+ </li>
+ <li>
+ Rewrite the <code>transform</code> method of
<code>Transformer</code> and <code>ValueTransformer</code> as
+ <code>process</code> or <code>processValues</code>;
+ </li>
+ <li>
+ Use the new <code>Record</code> or <code>FixedKeyRecord</code>
as argument of the renamed method;</li>
+ <li>
+ Rewrite the return type of the renamed method to
<code>void</code> and forward the record through the context;
+ and finally
+ </li>
+ <li>
+ Change the <code>KStream</code> call of the
<code>transform</code> method to <code>process</code> or
+ <code>processValues</code>.
+ </li>
+ </ol>
+ <h3>Migration Examples</h3>
+ <p>
+ To migrate from the deprecated <code>transform</code>,
<code>transformValues</code>, <code>flatTransform</code>, and
+ <code>flatTransformValues</code> methods to the Process API (PAPI)
in Kafka Streams, follow these examples. The new
+ <code>process</code> and <code>processValues</code> APIs enable a
more flexible and reusable approach by requiring
+ implementations of the <code>Processor</code> or
<code>FixedKeyProcessor</code> interfaces.
+ </p>
+ <p>Here are examples to help you migrate:</p>
+ <table>
+ <thead>
+ <tr>
+ <th>Example</th>
+ <th>Migrating from</th>
+ <th>Migrating to</th>
+ <th>State Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a
Loyalty Program</a></td>
+ <td><code>transform</code></td>
+ <td><code>process</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#categorizing-logs-by-severity">Categorizing Logs
by Severity</a></td>
+ <td><code>flatTransform</code></td>
+ <td><code>process</code></td>
+ <td>Stateless</td>
+ </tr>
+ <tr>
+ <td><a href="#traffic-radar-monitoring-car-count">Traffic
Radar Monitoring Car Count</a></td>
+ <td><code>transformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateful</td>
+ </tr>
+ <tr>
+ <td><a href="#replacing-slang-in-text-messages">Replacing
Slang in Text Messages</a></td>
+ <td><code>flatTransformValues</code></td>
+ <td><code>processValues</code></td>
+ <td>Stateless</td>
+ </tr>
+ </tbody>
+ </table>
+ <h4>Stateless Examples</h4>
+ <h5 id="categorizing-logs-by-severity">Categorizing Logs by
Severity</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> You have a stream of log messages. Each
message contains a severity level (e.g., INFO,
+ WARN, ERROR) in the value. The processor filters messages,
routing ERROR messages to a dedicated topic and
+ discarding INFO messages. The rest (WARN) are forwarded to
another processor.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In a production
monitoring system, categorizing logs by severity ensures
+ ERROR logs are sent to a critical incident management system,
WARN logs are analyzed for potential risks, and
+ INFO logs are stored for basic reporting purposes.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>categorizeWithFlatTransform</code> and
<code>categorizeWithProcess</code> show how you can
+ migrate from <code>flatTransform</code> to <code>process</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
- @Override
- public void init(ProcessorContext<Void, Void> context) {
- this.context = context;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CategorizingLogsBySeverityExample {
+ private static final String ERROR_LOGS_TOPIC =
"error-logs-topic";
+ private static final String INPUT_LOGS_TOPIC =
"input-logs-topic";
+ private static final String UNKNOWN_LOGS_TOPIC =
"unknown-logs-topic";
+ private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
+
+ public static void categorizeWithFlatTransform(final StreamsBuilder
builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.flatTransform(() -> new LogSeverityTransformer())
+ .to((key, value, recordContext) -> {
+ // Determine the target topic dynamically
+ if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
+ if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
+ return UNKNOWN_LOGS_TOPIC;
+ });
+ }
- // Here you would perform any additional initializations such as setting
up an email client.
- }
+ public static void categorizeWithProcess(final StreamsBuilder builder) {
+ final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
+ logStream.process(LogSeverityProcessor::new);
+ }
- @Override
- void process(Record<PageId, Long> record) {
- // Here you would format and send the alert email.
- //
- // In this specific example, you would be able to include
- // information about the page's ID and its view count
- }
+ private static class LogSeverityTransformer implements
Transformer<String, String, Iterable<KeyValue<String,
String>>> {
+ @Override
+ public void init(org.apache.kafka.streams.processor.ProcessorContext
context) {
+ }
- @Override
- void close() {
- // Any code for clean up would go here, for example tearing down the email
client and anything
- // else you created in the init() method
- // This processor instance will not be used again after this call.
- }
+ @Override
+ public Iterable<KeyValue<String, String>> transform(String
key, String value) {
+ if (value == null) {
+ return Collections.emptyList(); // Skip null values
+ }
-}</code></pre>
- <div class="admonition tip">
- <p><b>Tip</b></p>
- <p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
- State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
- There are two ways to connect state stores to a
processor:
- <ul class="simple">
- <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
- <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
- passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
- beforehand, the store will be automatically added
for you. You can also implement <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the
- <code class="docutils literal"><span
class="pre">Value*</span></code> or <code class="docutils literal"><span
class="pre">*WithKey</span></code> supplier variants, or <code class="docutils
literal"><span class="pre">TransformerSupplier</span></code> or any of its
variants.
- </li>
- </ul>
- </div>
- <p>Then we can leverage the <code class="docutils
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the
DSL via <code class="docutils literal"><span
class="pre">KStream#process</span></code>.</p>
- <pre class="line-numbers"><code
class="language-java">KStream<String, GenericRecord> pageViews = ...;
-
-// Send an email notification when the view count of a page reaches one
thousand.
-pageViews.groupByKey()
- .count()
- .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
- // PopularPageEmailAlert is your custom processor that implements the
- // `Processor` interface, see further down below.
- .process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
- </div>
+ // Assume the severity is the first word in the log message
+ // For example: "ERROR: Disk not found" ->
"ERROR"
+ int colonIndex = value.indexOf(':');
+ String severity = colonIndex > 0 ? value.substring(0,
colonIndex).trim() : "UNKNOWN";
+
+ // Create appropriate KeyValue pair based on severity
+ return switch (severity) {
+ case "ERROR" -> List.of(new
KeyValue<>("ERROR", value));
+ case "WARN" -> List.of(new
KeyValue<>("WARN", value));
+ case "INFO" -> Collections.emptyList(); // INFO
logs are ignored
+ default -> List.of(new
KeyValue<>("UNKNOWN", value));
+ };
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static class LogSeverityProcessor implements Processor<String,
String, String, String> {
+ private ProcessorContext<String, String> context;
+
+ @Override
+ public void init(final ProcessorContext<String, String> context)
{
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ if (record.value() == null) {
+ return; // Skip null values
+ }
+
+ // Assume the severity is the first word in the log message
+ // For example: "ERROR: Disk not found" ->
"ERROR"
+ final int colonIndex = record.value().indexOf(':');
+ final String severity = colonIndex > 0 ?
record.value().substring(0, colonIndex).trim() : "UNKNOWN";
+
+ // Route logs based on severity
+ switch (severity) {
+ case "ERROR":
+ context.forward(new Record<>(ERROR_LOGS_TOPIC,
record.value(), record.timestamp()));
+ break;
+ case "WARN":
+ context.forward(new Record<>(WARN_LOGS_TOPIC,
record.value(), record.timestamp()));
+ break;
+ case "INFO":
+ // INFO logs are ignored
+ break;
+ default:
+ // Forward to an "unknown" topic for logs with
unrecognized severities
+ context.forward(new Record<>(UNKNOWN_LOGS_TOPIC,
record.value(), record.timestamp()));
+ }
+ }
+ }
+}
+</code></pre>
+ <h5 id="replacing-slang-in-text-messages">Replacing Slang in Text
Messages</h5>
+ <ul>
+ <li>
+ <strong>Idea:</strong> A messaging stream contains
user-generated content, and you want to replace slang words
+ with their formal equivalents (e.g., "u" becomes
"you", "brb" becomes "be
+ right back"). The operation only modifies the message
value and keeps the key intact.
+ </li>
+ <li>
+ <strong>Real-World Context:</strong> In customer support chat
systems, normalizing text by replacing slang with
+ formal equivalents ensures that automated sentiment analysis
tools work accurately and provide reliable
+ insights.
+ </li>
+ </ul>
+ <p>
+ Below, methods <code>replaceWithFlatTransformValues</code> and
<code>replaceWithProcessValues</code> show how you
+ can migrate from <code>flatTransformValues</code> to
<code>processValues</code>.
+ </p>
+ <pre class="line-numbers"><code class="language-java">package
org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+public class ReplacingSlangTextInMessagesExample {
+ private static final Map<String, String> SLANG_DICTIONARY = Map.of(
+ "u", "you",
+ "brb", "be right back",
+ "omg", "oh my god",
+ "btw", "by the way"
+ );
+ private static final String INPUT_MESSAGES_TOPIC =
"input-messages-topic";
+ private static final String OUTPUT_MESSAGES_TOPIC =
"output-messages-topic";
+
+ public static void replaceWithFlatTransformValues(final StreamsBuilder
builder) {
+ KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
+
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
+ }
+
+ public static void replaceWithProcessValues(final StreamsBuilder builder) {
+ KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
+
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
+ }
+
+ private static class SlangReplacementTransformer implements
ValueTransformer<String, Iterable<String>> {
+ @Override
+ public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) {
+ }
+
+ @Override
+ public Iterable<String> transform(final String value) {
+ if (value == null) {
+ return Collections.emptyList(); // Skip null values
+ }
+
+ // Replace slang words in the message
+ final String[] words = value.split("\\s+");
+ return Arrays.asList(
+ Arrays.stream(words)
+ .map(word -> SLANG_DICTIONARY.getOrDefault(word, word))
+ .toArray(String[]::new)
+ );
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ private static class SlangReplacementProcessor implements
FixedKeyProcessor<String, String, String> {
Review Comment:
`ContextualFixedKeyProcessor` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]