SAMZA-1810: Added sample for remote table Added a sample that demonstrates the usage of remote table backed by RESTful service.
Author: Wei Song <ws...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #34 from weisong44/latest Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/a5e5e56b Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/a5e5e56b Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/a5e5e56b Branch: refs/heads/master Commit: a5e5e56ba05163d4d20345079587568f7a6aba4b Parents: cffd9fe Author: Wei Song <ws...@linkedin.com> Authored: Thu Aug 16 19:00:10 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Aug 16 19:00:10 2018 -0700 ---------------------------------------------------------------------- bin/deploy.sh | 2 +- gradle.properties | 2 +- pom.xml | 9 +- .../config/stock-price-table-joiner.properties | 35 ++++ .../cookbook/StockPriceTableJoiner.java | 173 +++++++++++++++++++ 5 files changed, 217 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/bin/deploy.sh ---------------------------------------------------------------------- diff --git a/bin/deploy.sh b/bin/deploy.sh index d65b525..5b50079 100755 --- a/bin/deploy.sh +++ b/bin/deploy.sh @@ -23,4 +23,4 @@ base_dir=`pwd` mvn clean package mkdir -p $base_dir/deploy/samza -tar -xvf $base_dir/target/hello-samza-0.14.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza +tar -xvf $base_dir/target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index 6bc7384..37f8eb7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ * under the License. */ -SAMZA_VERSION=0.14.1-SNAPSHOT +SAMZA_VERSION=0.15.0-SNAPSHOT KAFKA_VERSION=0.11.0.2 HADOOP_VERSION=2.6.1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ddb3939..a41be49 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ under the License. <groupId>org.apache.samza</groupId> <artifactId>hello-samza</artifactId> - <version>0.14.1-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <packaging>jar</packaging> <name>Samza Example</name> <description> @@ -143,12 +143,17 @@ under the License. <artifactId>hadoop-yarn-common</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>17.0</version> + </dependency> </dependencies> <properties> <!-- maven specific properties --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <samza.version>0.14.1-SNAPSHOT</samza.version> + <samza.version>0.15.0-SNAPSHOT</samza.version> <hadoop.version>2.6.1</hadoop.version> </properties> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/src/main/config/stock-price-table-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/stock-price-table-joiner.properties b/src/main/config/stock-price-table-joiner.properties new file mode 100644 index 0000000..f9bd684 --- /dev/null +++ b/src/main/config/stock-price-table-joiner.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=stock-price-table-joiner +job.container.count=1 +job.default.system=kafka +job.coordinator.system=kafka + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.StockPriceTableJoiner + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.consumer.zookeeper.connect=localhost:2181 +systems.kafka.producer.bootstrap.servers=localhost:9092 +systems.kafka.default.stream.replication.factor=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java new file mode 100644 index 0000000..cb735d2 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import java.io.Serializable; +import java.net.URL; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.table.Table; +import org.apache.samza.table.caching.CachingTableDescriptor; +import org.apache.samza.table.remote.RemoteTableDescriptor; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.util.ExponentialSleepStrategy; +import org.apache.samza.util.HttpUtil; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * In this example, we join a stream of stock symbols with a remote table backed by a RESTful service, + * which delivers latest stock quotes. The join results contain stock symbol and latest price, and are + * delivered to an output stream. + * + * A rate limit of 10 requests/second is set of the entire job, internally Samza uses an embedded + * rate limiter, which evenly distributes the total rate limit among tasks. + * + * A caching table is used over the remote table with a read TTL of 5 seconds, therefore one would + * receive the same quote with this time span. + * + * <p> Concepts covered: remote table, rate limiter, caching table, stream to table joins. + * + * To run the below example: + * + * <ol> + * <li> + * Create Kafka topics "stock-symbol-input", "stock-price-output" are created <br/> + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-symbol-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-price-output --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/stock-price-table-joiner.properties + * </li> + * <li> + * Consume messages from the output topic <br/> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stock-price-output + * </li> + * <li> + * Produce some messages to the input topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic stock-symbol-input --broker-list localhost:9092 + * + * After the console producer is started, type + * MSFT + * + * You should see messages like below from the console consumer window + * {"symbol":"MSFT","close":107.64} + * + * Note: you will need a free API key for symbols other than MSFT, see below for more information. + * </li> + * </ol> + * + */ +public class StockPriceTableJoiner implements StreamApplication { + + /** + * Default API key "demo" only works for symbol "MSFT"; however you can get an + * API key for free at https://www.alphavantage.co/, which will work for other symbols. + */ + private static final String API_KEY = "demo"; + + private static final String URL_TEMPLATE = + "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=" + API_KEY; + + private static final String INPUT_TOPIC = "stock-symbol-input"; + private static final String OUTPUT_TOPIC = "stock-price-output"; + + @Override + public void init(StreamGraph graph, Config config) { + + Table remoteTable = graph.getTable(new RemoteTableDescriptor("remote-table") + .withReadRateLimit(10) + .withReadFunction(new StockPriceReadFunction())); + + Table table = graph.getTable(new CachingTableDescriptor("table") + .withTable(remoteTable) + .withReadTtl(Duration.ofSeconds(5))); + + OutputStream<StockPrice> joinResultStream = graph.getOutputStream( + OUTPUT_TOPIC, new JsonSerdeV2<>(StockPrice.class)); + + graph.getInputStream(INPUT_TOPIC, new StringSerde()) + .map(symbol -> new KV<String, Void>(symbol, null)) + .join(table, new JoinFn()) + .sendTo(joinResultStream); + + } + + static class JoinFn implements StreamTableJoinFunction<String, KV<String, Void>, KV<String, Double>, StockPrice> { + @Override + public StockPrice apply(KV<String, Void> message, KV<String, Double> record) { + return record == null ? null : new StockPrice(message.getKey(), record.getValue()); + } + @Override + public String getMessageKey(KV<String, Void> message) { + return message.getKey(); + } + @Override + public String getRecordKey(KV<String, Double> record) { + return record.getKey(); + } + } + + static class StockPriceReadFunction implements TableReadFunction<String, Double> { + @Override + public CompletableFuture<Double> getAsync(String symbol) { + return CompletableFuture.supplyAsync(() -> { + try { + URL url = new URL(String.format(URL_TEMPLATE, symbol)); + String response = HttpUtil.read(url, 5000, new ExponentialSleepStrategy()); + JsonParser parser = new JsonFactory().createJsonParser(response); + while (!parser.isClosed()) { + if (JsonToken.FIELD_NAME.equals(parser.nextToken()) && "4. close".equalsIgnoreCase(parser.getCurrentName())) { + return Double.valueOf(parser.nextTextValue()); + } + } + return -1d; + } catch (Exception ex) { + throw new SamzaException(ex); + } + }); + } + } + + static class StockPrice implements Serializable { + + public final String symbol; + public final Double close; + + public StockPrice( + @JsonProperty("symbol") String symbol, + @JsonProperty("close") Double close) { + this.symbol = symbol; + this.close = close; + } + } + +}