Repository: samza-hello-samza
Updated Branches:
  refs/heads/latest cffd9fef6 -> a5e5e56ba


SAMZA-1810: Added sample for remote table

Added a sample that demonstrates the usage of remote table backed by RESTful 
service.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #34 from weisong44/latest


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/a5e5e56b
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/a5e5e56b
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/a5e5e56b

Branch: refs/heads/latest
Commit: a5e5e56ba05163d4d20345079587568f7a6aba4b
Parents: cffd9fe
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Aug 16 19:00:10 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Aug 16 19:00:10 2018 -0700

----------------------------------------------------------------------
 bin/deploy.sh                                   |   2 +-
 gradle.properties                               |   2 +-
 pom.xml                                         |   9 +-
 .../config/stock-price-table-joiner.properties  |  35 ++++
 .../cookbook/StockPriceTableJoiner.java         | 173 +++++++++++++++++++
 5 files changed, 217 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
index d65b525..5b50079 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@ base_dir=`pwd`
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-0.14.1-SNAPSHOT-dist.tar.gz -C 
$base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C 
$base_dir/deploy/samza

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 6bc7384..37f8eb7 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-SAMZA_VERSION=0.14.1-SNAPSHOT
+SAMZA_VERSION=0.15.0-SNAPSHOT
 KAFKA_VERSION=0.11.0.2
 HADOOP_VERSION=2.6.1
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ddb3939..a41be49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>0.14.1-SNAPSHOT</version>
+  <version>0.15.0-SNAPSHOT</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -143,12 +143,17 @@ under the License.
       <artifactId>hadoop-yarn-common</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>17.0</version>
+    </dependency>
   </dependencies>
 
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.14.1-SNAPSHOT</samza.version>
+    <samza.version>0.15.0-SNAPSHOT</samza.version>
     <hadoop.version>2.6.1</hadoop.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/src/main/config/stock-price-table-joiner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/stock-price-table-joiner.properties 
b/src/main/config/stock-price-table-joiner.properties
new file mode 100644
index 0000000..f9bd684
--- /dev/null
+++ b/src/main/config/stock-price-table-joiner.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=stock-price-table-joiner
+job.container.count=1
+job.default.system=kafka
+job.coordinator.system=kafka
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.StockPriceTableJoiner
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a5e5e56b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java 
b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java
new file mode 100644
index 0000000..cb735d2
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.caching.CachingTableDescriptor;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * In this example, we join a stream of stock symbols with a remote table 
backed by a RESTful service,
+ * which delivers latest stock quotes. The join results contain stock symbol 
and latest price, and are
+ * delivered to an output stream.
+ *
+ * A rate limit of 10 requests/second is set of the entire job, internally 
Samza uses an embedded
+ * rate limiter, which evenly distributes the total rate limit among tasks.
+ *
+ * A caching table is used over the remote table with a read TTL of 5 seconds, 
therefore one would
+ * receive the same quote with this time span.
+ *
+ * <p> Concepts covered: remote table, rate limiter, caching table, stream to 
table joins.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Create Kafka topics "stock-symbol-input", "stock-price-output" are 
created  <br/>
+ *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create 
--topic stock-symbol-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create 
--topic stock-price-output --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=file://$PWD/deploy/samza/config/stock-price-table-joiner.properties
+ *   </li>
+ *   <li>
+ *     Consume messages from the output topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --topic stock-price-output
+ *   </li>
+ *   <li>
+ *     Produce some messages to the input topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic stock-symbol-input 
--broker-list localhost:9092
+ *
+ *     After the console producer is started, type
+ *     MSFT
+ *
+ *     You should see messages like below from the console consumer window
+ *     {"symbol":"MSFT","close":107.64}
+ *
+ *     Note: you will need a free API key for symbols other than MSFT, see 
below for more information.
+ *   </li>
+ * </ol>
+ *
+ */
+public class StockPriceTableJoiner implements StreamApplication {
+
+  /**
+   * Default API key "demo" only works for symbol "MSFT"; however you can get 
an
+   * API key for free at https://www.alphavantage.co/, which will work for 
other symbols.
+   */
+  private static final String API_KEY = "demo";
+
+  private static final String URL_TEMPLATE =
+      
"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=";
 + API_KEY;
+
+  private static final String INPUT_TOPIC = "stock-symbol-input";
+  private static final String OUTPUT_TOPIC = "stock-price-output";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    Table remoteTable = graph.getTable(new 
RemoteTableDescriptor("remote-table")
+        .withReadRateLimit(10)
+        .withReadFunction(new StockPriceReadFunction()));
+
+    Table table = graph.getTable(new CachingTableDescriptor("table")
+        .withTable(remoteTable)
+        .withReadTtl(Duration.ofSeconds(5)));
+
+    OutputStream<StockPrice> joinResultStream = graph.getOutputStream(
+        OUTPUT_TOPIC, new JsonSerdeV2<>(StockPrice.class));
+
+    graph.getInputStream(INPUT_TOPIC, new StringSerde())
+        .map(symbol -> new KV<String, Void>(symbol, null))
+        .join(table, new JoinFn())
+        .sendTo(joinResultStream);
+
+  }
+
+  static class JoinFn implements StreamTableJoinFunction<String, KV<String, 
Void>, KV<String, Double>, StockPrice> {
+    @Override
+    public StockPrice apply(KV<String, Void> message, KV<String, Double> 
record) {
+      return record == null ? null : new StockPrice(message.getKey(), 
record.getValue());
+    }
+    @Override
+    public String getMessageKey(KV<String, Void> message) {
+      return message.getKey();
+    }
+    @Override
+    public String getRecordKey(KV<String, Double> record) {
+      return record.getKey();
+    }
+  }
+
+  static class StockPriceReadFunction implements TableReadFunction<String, 
Double> {
+    @Override
+    public CompletableFuture<Double> getAsync(String symbol) {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          URL url = new URL(String.format(URL_TEMPLATE, symbol));
+          String response = HttpUtil.read(url, 5000, new 
ExponentialSleepStrategy());
+          JsonParser parser = new JsonFactory().createJsonParser(response);
+          while (!parser.isClosed()) {
+            if (JsonToken.FIELD_NAME.equals(parser.nextToken()) && "4. 
close".equalsIgnoreCase(parser.getCurrentName())) {
+              return Double.valueOf(parser.nextTextValue());
+            }
+          }
+          return -1d;
+        } catch (Exception ex) {
+          throw new SamzaException(ex);
+        }
+      });
+    }
+  }
+
+  static class StockPrice implements Serializable {
+
+    public final String symbol;
+    public final Double close;
+
+    public StockPrice(
+        @JsonProperty("symbol") String symbol,
+        @JsonProperty("close") Double close) {
+      this.symbol = symbol;
+      this.close = close;
+    }
+  }
+
+}

Reply via email to