Repository: kafka Updated Branches: refs/heads/trunk 4086db472 -> 94a6d6c02
KAFKA-5454: Add a new Kafka Streams example IoT oriented Added a Kafka Streams example (IoT oriented) using "tumbling" window Author: Paolo Patierno <[email protected]> Author: ppatierno <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Michael G. Noll <[email protected]> Closes #3352 from ppatierno/stream-temperature-example Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94a6d6c0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94a6d6c0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94a6d6c0 Branch: refs/heads/trunk Commit: 94a6d6c02d478f5ccec1928163ff921688155a01 Parents: 4086db4 Author: Paolo Patierno <[email protected]> Authored: Tue Aug 1 11:22:49 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Aug 1 11:22:49 2017 -0700 ---------------------------------------------------------------------- .../examples/temperature/TemperatureDemo.java | 144 +++++++++++++++++++ 1 file changed, 144 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/94a6d6c0/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java new file mode 100644 index 0000000..764210b --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.examples.temperature; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.WindowedDeserializer; +import org.apache.kafka.streams.kstream.internals.WindowedSerializer; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates, using the high-level KStream DSL, how to implement an IoT demo application + * which ingests temperature value processing the maximum value in the latest TEMPERATURE_WINDOW_SIZE seconds (which + * is 5 seconds) sending a new message if it exceeds the TEMPERATURE_THRESHOLD (which is 20) + * + * In this example, the input stream reads from a topic named "iot-temperature", where the values of messages + * represent temperature values; using a TEMPERATURE_WINDOW_SIZE seconds "tumbling" window, the maximum value is processed and + * sent to a topic named "iot-temperature-max" if it exceeds the TEMPERATURE_THRESHOLD. + * + * Before running this example you must create the input topic for temperature values in the following way : + * + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iot-temperature + * + * and at same time the output topic for filtered values : + * + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iot-temperature-max + * + * After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic : + * + * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iot-temperature-max --from-beginning + * + * On the other side, a console producer can be used for sending temperature values (which needs to be integers) + * to "iot-temperature" typing them on the console : + * + * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic iot-temperature + * > 10 + * > 15 + * > 22 + */ +public class TemperatureDemo { + + // threshold used for filtering max temperature values + private static final int TEMPERATURE_THRESHOLD = 20; + // window size within which the filtering is applied + private static final int TEMPERATURE_WINDOW_SIZE = 5; + + public static void main(String[] args) throws Exception { + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + + StreamsBuilder builder = new StreamsBuilder(); + + KStream<String, String> source = builder.stream("iot-temperature"); + + KStream<Windowed<String>, String> max = source + // temperature values are sent without a key (null), so in order + // to group and reduce them, a key is needed ("temp" has been chosen) + .selectKey(new KeyValueMapper<String, String, String>() { + @Override + public String apply(String key, String value) { + return "temp"; + } + }) + .groupByKey() + .reduce(new Reducer<String>() { + @Override + public String apply(String value1, String value2) { + if (Integer.parseInt(value1) > Integer.parseInt(value2)) + return value1; + else + return value2; + } + }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE))) + .toStream() + .filter(new Predicate<Windowed<String>, String>() { + @Override + public boolean test(Windowed<String> key, String value) { + return Integer.parseInt(value) > TEMPERATURE_THRESHOLD; + } + }); + + WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer()); + WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer()); + Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer); + + // need to override key serde to Windowed<String> type + max.to(windowedSerde, Serdes.String(), "iot-temperature-max"); + + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + Exit.exit(1); + } + Exit.exit(0); + } +}
