fonsdant commented on PR #18314:
URL: https://github.com/apache/kafka/pull/18314#issuecomment-2585901473
@mjsax, how about using the `PopularPageEmailAlert` of the _Applying
processors and transformers (Processor API integration)_ section?
```java
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
public class PopularPageEmailAlertExample {
private static final String ALERTS_EMAIL = "[email protected]";
private static final String PAGE_VIEWS_TOPIC = "page-views-topic";
public static void alertWithOldProcess(StreamsBuilder builder) {
KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
// Filter pages with exactly 1000 views and process them using the
old API
pageViews.filter((pageId, viewCount) -> viewCount == 1000)
.process(PopularPageEmailAlertOld::new);
}
public static void alertWithNewProcess(StreamsBuilder builder) {
KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
// Filter pages with exactly 1000 views and process them using the
new API
pageViews.filter((pageId, viewCount) -> viewCount == 1000)
.process(PopularPageEmailAlertNew::new);
}
private static class PopularPageEmailAlertOld extends
AbstractProcessor<String, Long> {
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext
context) {
super.init(context);
System.out.println("Initialized email client for: " +
ALERTS_EMAIL);
}
@Override
public void process(String key, Long value) {
if (value == null) return;
if (value == 1000) {
// Send an email alert
System.out.printf("ALERT (Old API): Page %s has reached 1000
views. Sending email to %s%n", key, ALERTS_EMAIL);
}
}
@Override
public void close() {
System.out.println("Tearing down email client for: " +
ALERTS_EMAIL);
}
}
private static class PopularPageEmailAlertNew implements
Processor<String, Long, Void, Void> {
@Override
public void init(ProcessorContext<Void, Void> context) {
System.out.println("Initialized email client for: " +
ALERTS_EMAIL);
}
@Override
public void process(Record<String, Long> record) {
if (record.value() == null) return;
if (record.value() == 1000) {
// Send an email alert
System.out.printf("ALERT (New API): Page %s has reached 1000
views. Sending email to %s%n", record.key(), ALERTS_EMAIL);
}
}
@Override
public void close() {
System.out.println("Tearing down email client for: " +
ALERTS_EMAIL);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]