fonsdant commented on PR #18314:
URL: https://github.com/apache/kafka/pull/18314#issuecomment-2566716097
I have wrote a "draft" into Markdown, before to write it into HTML. I hope
this helps the review. Now, I will translate it to HTML.
***
# Migrating from transform to process
To migrate from the deprecated transform and flatTransform methods to the
process API in Kafka Streams, follow these
steps. The new process and processValues APIs enable a more flexible and
reusable approach by requiring implementations
of the Processor or ProcessorWithKey interfaces.
Here are examples to help you migrate:
| Example
| Migrating from | Migrating to | Type |
|-------------------------------------------------------------------------------------------|-----------------------|-----------------|-----------|
| [Cumulative Discounts for a Loyalty
Program](#cumulative-discounts-for-a-loyalty-program) | `transform` |
`process` | Stateful |
| [Categorizing Logs by Severity](#categorizing-logs-by-severity)
| `flatTransform` | `process` | Stateless |
| [Traffic Radar Monitoring Car Count](#traffic-radar-monitoring-car-count)
| `transformValues` | `processValues` | Stateful |
| [Replacing Slang in Text Messages](#replacing-slang-in-text-messages)
| `flatTransformValues` | `processValues` | Stateless |
## Stateless Examples
### Categorizing Logs by Severity
* **Idea:** You have a stream of log messages. Each message contains a
severity level (e.g., INFO, WARN, ERROR) in the
value. The processor filters messages, routing ERROR messages to a
dedicated topic and discarding INFO messages. The
rest (WARN) are forwarded to another processor.
* **Real-World Context:** In a production monitoring system, categorizing
logs by severity ensures ERROR logs are sent
to a critical incident management system, WARN logs are analyzed for
potential risks, and INFO logs are stored for
basic reporting purposes.
Below, methods `categorizeWithFlatTransform` and `categorizeWithProcess`
show how you can migrate from `flatTransform`
to `process`.
```java
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Collections;
import java.util.List;
public class CategorizingLogsBySeverityExample {
private static final String ERROR_LOGS_TOPIC = "error-logs-topic";
private static final String INPUT_LOGS_TOPIC = "input-logs-topic";
private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic";
private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
public static void categorizeWithFlatTransform(final StreamsBuilder
builder) {
final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
logStream.flatTransform(() -> new LogSeverityTransformer())
.to((key, value, recordContext) -> {
// Determine the target topic dynamically
if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
return UNKNOWN_LOGS_TOPIC;
});
}
public static void categorizeWithProcess(final StreamsBuilder builder) {
final KStream<String, String> logStream =
builder.stream(INPUT_LOGS_TOPIC);
logStream.process(LogSeverityProcessor::new);
}
private static class LogSeverityTransformer implements
Transformer<String, String, Iterable<KeyValue<String, String>>> {
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext
context) {
}
@Override
public Iterable<KeyValue<String, String>> transform(String key,
String value) {
if (value == null) {
return Collections.emptyList(); // Skip null values
}
// Assume the severity is the first word in the log message
// For example: "ERROR: Disk not found" -> "ERROR"
int colonIndex = value.indexOf(':');
String severity = colonIndex > 0 ? value.substring(0,
colonIndex).trim() : "UNKNOWN";
// Create appropriate KeyValue pair based on severity
return switch (severity) {
case "ERROR" -> List.of(new KeyValue<>("ERROR", value));
case "WARN" -> List.of(new KeyValue<>("WARN", value));
case "INFO" -> Collections.emptyList(); // INFO logs are
ignored
default -> List.of(new KeyValue<>("UNKNOWN", value));
};
}
@Override
public void close() {
}
}
private static class LogSeverityProcessor implements Processor<String,
String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void init(final ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(final Record<String, String> record) {
if (record.value() == null) {
return; // Skip null values
}
// Assume the severity is the first word in the log message
// For example: "ERROR: Disk not found" -> "ERROR"
final int colonIndex = record.value().indexOf(':');
final String severity = colonIndex > 0 ?
record.value().substring(0, colonIndex).trim() : "UNKNOWN";
// Route logs based on severity
switch (severity) {
case "ERROR":
context.forward(new Record<>(ERROR_LOGS_TOPIC,
record.value(), record.timestamp()));
break;
case "WARN":
context.forward(new Record<>(WARN_LOGS_TOPIC,
record.value(), record.timestamp()));
break;
case "INFO":
// INFO logs are ignored
break;
default:
// Forward to an "unknown" topic for logs with
unrecognized severities
context.forward(new Record<>(UNKNOWN_LOGS_TOPIC,
record.value(), record.timestamp()));
}
}
}
}
```
### Replacing Slang in Text Messages
* **Idea:** A messaging stream contains user-generated content, and you want
to replace slang words with their formal
equivalents (e.g., "u" becomes "you", "brb" becomes "be right back"). The
operation only modifies the message value
and keeps the key intact.
* **Real-World Context:** In customer support chat systems, normalizing text
by replacing slang with formal equivalents
ensures that automated sentiment analysis tools work accurately and
provide reliable insights.
Below, methods `replaceWithFlatTransformValues` and
`replaceWithProcessValues` show how you can migrate from
`flatTransformValues` to `processValues`.
```java
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
public class ReplacingSlangTextInMessagesExample {
private static final Map<String, String> SLANG_DICTIONARY = Map.of(
"u", "you",
"brb", "be right back",
"omg", "oh my god",
"btw", "by the way"
);
private static final String INPUT_MESSAGES_TOPIC =
"input-messages-topic";
private static final String OUTPUT_MESSAGES_TOPIC =
"output-messages-topic";
public static void replaceWithFlatTransformValues(final StreamsBuilder
builder) {
KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
}
public static void replaceWithProcessValues(final StreamsBuilder
builder) {
KStream<String, String> messageStream =
builder.stream(INPUT_MESSAGES_TOPIC);
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
}
private static class SlangReplacementTransformer implements
ValueTransformer<String, Iterable<String>> {
@Override
public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) {
}
@Override
public Iterable<String> transform(final String value) {
if (value == null) {
return Collections.emptyList(); // Skip null values
}
// Replace slang words in the message
final String[] words = value.split("\\s+");
return Arrays.asList(
Arrays.stream(words)
.map(word -> SLANG_DICTIONARY.getOrDefault(word, word))
.toArray(String[]::new)
);
}
@Override
public void close() {
}
}
private static class SlangReplacementProcessor implements
FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(final FixedKeyProcessorContext<String, String>
context) {
this.context = context;
}
@Override
public void process(final FixedKeyRecord<String, String> record) {
if (record.value() == null) {
return; // Skip null values
}
// Replace slang words in the message
final String[] words = record.value().split("\\s+");
final StringBuilder replacedMessage = new StringBuilder();
for (String word : words) {
replacedMessage.append(SLANG_DICTIONARY.getOrDefault(word,
word)).append(" ");
}
context.forward(record.withValue(replacedMessage.toString()));
}
}
}
```
## Stateful Examples
### Cumulative Discounts for a Loyalty Program
* **Idea:** A stream of purchase events contains user IDs and transaction
amounts. Use a state store to accumulate the
total spending of each user. When their total crosses a threshold, apply a
discount on their next transaction and
update their accumulated total.
* **Real-World Context:** In a retail loyalty program, tracking cumulative
customer spending enables dynamic rewards,
such as issuing a discount when a customer’s total purchases exceed a
predefined limit.
Below, methods `applyDiscountWithTransform` and `applyDiscountWithProcess`
show how you can migrate from `transform` to
`process`.
```java
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
public class CumulativeDiscountsForALoyaltyProgramExample {
private static final double DISCOUNT_THRESHOLD = 100.0;
private static final String CUSTOMER_SPENDING_STORE =
"customer-spending-store";
private static final String DISCOUNT_NOTIFICATION_MESSAGE =
"Discount applied! You have received a reward for your purchases.";
private static final String DISCOUNT_NOTIFICATIONS_TOPIC =
"discount-notifications-topic";
private static final String PURCHASE_EVENTS_TOPIC =
"purchase-events-topic";
public static void applyDiscountWithTransform(final StreamsBuilder
builder) {
// Define the state store for tracking cumulative spending
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.Double()
)
);
final KStream<String, Double> purchaseStream =
builder.stream(PURCHASE_EVENTS_TOPIC);
// Apply the Transformer with the state store
final KStream<String, String> notificationStream =
purchaseStream.transform(CumulativeDiscountTransformer::new,
CUSTOMER_SPENDING_STORE);
// Send the notifications to the output topic
notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
}
public static void applyDiscountWithProcess(final StreamsBuilder
builder) {
// Define the state store for tracking cumulative spending
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.Double()
)
);
final KStream<String, Double> purchaseStream =
builder.stream(PURCHASE_EVENTS_TOPIC);
// Apply the Processor with the state store
final KStream<String, String> notificationStream =
purchaseStream.process(CumulativeDiscountProcessor::new,
CUSTOMER_SPENDING_STORE);
// Send the notifications to the output topic
notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
}
private static class CumulativeDiscountTransformer implements
Transformer<String, Double, KeyValue<String, String>> {
private KeyValueStore<String, Double> spendingStore;
@Override
public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) {
// Retrieve the state store for cumulative spending
spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
}
@Override
public KeyValue<String, String> transform(final String key, final
Double value) {
if (value == null) {
return null; // Skip null purchase amounts
}
// Get the current spending total for the customer
Double currentSpending = spendingStore.get(key);
if (currentSpending == null) {
currentSpending = 0.0;
}
// Update the cumulative spending
currentSpending += value;
spendingStore.put(key, currentSpending);
// Check if the customer qualifies for a discount
if (currentSpending >= DISCOUNT_THRESHOLD) {
// Reset the spending after applying the discount
spendingStore.put(key, currentSpending - DISCOUNT_THRESHOLD);
// Return a notification message
return new KeyValue<>(key, DISCOUNT_NOTIFICATION_MESSAGE);
}
return null; // No discount, so no output for this record
}
@Override
public void close() {
}
}
private static class CumulativeDiscountProcessor implements
Processor<String, Double, String, String> {
private KeyValueStore<String, Double> spendingStore;
private ProcessorContext<String, String> context;
@Override
public void init(final ProcessorContext<String, String> context) {
this.context = context;
// Retrieve the state store for cumulative spending
spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
}
@Override
public void process(final Record<String, Double> record) {
if (record.value() == null) {
return; // Skip null purchase amounts
}
// Get the current spending total for the customer
Double currentSpending = spendingStore.get(record.key());
if (currentSpending == null) {
currentSpending = 0.0;
}
// Update the cumulative spending
currentSpending += record.value();
spendingStore.put(record.key(), currentSpending);
// Check if the customer qualifies for a discount
if (currentSpending >= DISCOUNT_THRESHOLD) {
// Reset the spending after applying the discount
spendingStore.put(record.key(), currentSpending -
DISCOUNT_THRESHOLD);
// Send a discount notification
context.forward(new Record<>(record.key(),
DISCOUNT_NOTIFICATION_MESSAGE, record.timestamp()));
}
}
}
}
```
### Traffic Radar Monitoring Car Count
* **Idea:** A radar monitors cars passing along a road stretch. A system
counts the cars for each day, maintaining a
cumulative total for the current day in a state store. At the end of the
day, the count is emitted and the state is
cleared for the next day.
* **Real-World Context:** A car counting system can be useful for
determining measures for widening or controlling
traffic depending on the number of cars passing through the monitored
stretch.
Below, methods `countWithTransformValues` and `countWithProcessValues` show
how you can migrate from `transformValues`
to `processValues`.
```java
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.state.Stores;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class TrafficRadarMonitoringCarCountExample {
private static final String DAILY_COUNT_STORE = "price-state-store";
private static final String DAILY_COUNT_TOPIC = "price-state-topic";
private static final String RADAR_COUNT_TOPIC = "car-radar-topic";
public static void countWithTransformValues(final StreamsBuilder
builder) {
// Define a state store for tracking daily car counts
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.Long()
)
);
final KStream<Void, String> radarStream =
builder.stream(RADAR_COUNT_TOPIC);
// Apply the ValueTransformer with the state store
radarStream.transformValues(DailyCarCountTransformer::new,
DAILY_COUNT_STORE)
.to(DAILY_COUNT_TOPIC);
}
public static void countWithProcessValues(final StreamsBuilder builder) {
// Define a state store for tracking daily car counts
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.Long()
)
);
final KStream<Void, String> radarStream =
builder.stream(RADAR_COUNT_TOPIC);
// Apply the FixedKeyProcessor with the state store
radarStream.processValues(DailyCarCountProcessor::new,
DAILY_COUNT_STORE)
.to(DAILY_COUNT_TOPIC);
}
public static class DailyCarCountTransformer implements
ValueTransformer<String, String> {
private KeyValueStore<String, Long> stateStore;
private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
@Override
public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) {
// Access the state store
stateStore = context.getStateStore(DAILY_COUNT_STORE);
}
@Override
public String transform(final String value) {
if (value == null) {
return null; // Skip null events
}
// Derive the current day from the event timestamp
final long timestamp = System.currentTimeMillis(); // Use system
time for simplicity
final String currentDay =
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
// Retrieve the current count for the day
Long dailyCount = stateStore.get(currentDay);
if (dailyCount == null) {
dailyCount = 0L;
}
// Increment the count
dailyCount++;
stateStore.put(currentDay, dailyCount);
// Return the current day's count
return String.format("Day: %s, Car Count: %s", currentDay,
dailyCount);
}
@Override
public void close() {
}
}
private static class DailyCarCountProcessor implements
FixedKeyProcessor<Void, String, String> {
private FixedKeyProcessorContext<Void, String> context;
private KeyValueStore<String, Long> stateStore;
private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
@Override
public void init(final FixedKeyProcessorContext<Void, String>
context) {
this.context = context;
stateStore = context.getStateStore(DAILY_COUNT_STORE);
}
@Override
public void process(final FixedKeyRecord<Void, String> record) {
if (record.value() == null) {
return; // Skip null events
}
// Derive the current day from the event timestamp
final long timestamp = System.currentTimeMillis(); // Use system
time for simplicity
final String currentDay =
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
// Retrieve the current count for the day
Long dailyCount = stateStore.get(currentDay);
if (dailyCount == null) {
dailyCount = 0L;
}
// Increment the count
dailyCount++;
stateStore.put(currentDay, dailyCount);
// Emit the current day's count
context.forward(record.withValue(String.format("Day: %s, Car
Count: %s", currentDay, dailyCount)));
}
}
}
```
## Keynotes
* The process and processValues APIs utilize ProcessorContext and Record
objects for better type safety and flexibility.
* Implementations for Processor or ProcessorWithKey should manage state and
logic clearly. Use context().forward() for
* emitting records downstream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]