Update Cookbook examples Updating cookbook examples with latest api. I verified all the applications except table examples. Found a bug (https://github.com/apache/samza/pull/684) in metadatastore with Table example. Once the fix is in, need to verify the table examples again.
Also change the name of the examples as we discussed before. Author: xinyuiscool <xinyuliu...@gmail.com> Reviewers: Prateek M <prate...@apache.org> Closes #35 from xinyuiscool/cookbook Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f9374176 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f9374176 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f9374176 Branch: refs/heads/master Commit: f9374176802bd4b67ff320fe4a1dfe499ff9eb41 Parents: b6acf19 Author: xinyuiscool <xinyuliu...@gmail.com> Authored: Tue Oct 2 18:41:55 2018 -0700 Committer: xiliu <xi...@linkedin.com> Committed: Tue Oct 2 18:41:55 2018 -0700 ---------------------------------------------------------------------- src/main/config/filter-example.properties | 28 +++ src/main/config/join-example.properties | 28 +++ .../config/pageview-adclick-joiner.properties | 28 --- src/main/config/pageview-filter.properties | 28 --- .../pageview-profile-table-joiner.properties | 27 --- src/main/config/pageview-sessionizer.properties | 28 --- .../config/remote-table-join-example.properties | 27 +++ .../config/session-window-example.properties | 28 +++ .../config/stock-price-table-joiner.properties | 27 --- .../config/stream-table-join-example.properties | 27 +++ .../config/tumbling-pageview-counter.properties | 28 --- .../config/tumbling-window-example.properties | 28 +++ .../samza/examples/cookbook/FilterExample.java | 101 ++++++++++ .../samza/examples/cookbook/JoinExample.java | 163 +++++++++++++++ .../cookbook/PageViewAdClickJoiner.java | 162 --------------- .../examples/cookbook/PageViewFilterApp.java | 102 ---------- .../cookbook/PageViewProfileTableJoiner.java | 160 --------------- .../cookbook/PageViewSessionizerApp.java | 119 ----------- .../cookbook/RemoteTableJoinExample.java | 200 +++++++++++++++++++ .../examples/cookbook/SessionWindowExample.java | 120 +++++++++++ .../cookbook/StockPriceTableJoiner.java | 200 ------------------- .../cookbook/StreamTableJoinExample.java | 160 +++++++++++++++ .../cookbook/TumblingPageViewCounterApp.java | 120 ----------- .../cookbook/TumblingWindowExample.java | 121 +++++++++++ 24 files changed, 1031 insertions(+), 1029 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/filter-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/filter-example.properties b/src/main/config/filter-example.properties new file mode 100644 index 0000000..20ca0cd --- /dev/null +++ b/src/main/config/filter-example.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-filter +job.container.count=2 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.FilterExample +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/join-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/join-example.properties b/src/main/config/join-example.properties new file mode 100644 index 0000000..71ff5c0 --- /dev/null +++ b/src/main/config/join-example.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-adclick-joiner +job.container.count=2 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.JoinExample +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/pageview-adclick-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties deleted file mode 100644 index 8764974..0000000 --- a/src/main/config/pageview-adclick-joiner.properties +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=pageview-adclick-joiner -job.container.count=2 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.PageViewAdClickJoiner -task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/pageview-filter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties deleted file mode 100644 index 84228fa..0000000 --- a/src/main/config/pageview-filter.properties +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=pageview-filter -job.container.count=2 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.PageViewFilterApp -task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/pageview-profile-table-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-profile-table-joiner.properties b/src/main/config/pageview-profile-table-joiner.properties deleted file mode 100644 index d8c0fcf..0000000 --- a/src/main/config/pageview-profile-table-joiner.properties +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=pageview-profile-table-joiner -job.container.count=2 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.PageViewProfileTableJoiner http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/pageview-sessionizer.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties deleted file mode 100644 index 74109ad..0000000 --- a/src/main/config/pageview-sessionizer.properties +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=pageview-sessionizer -job.container.count=2 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.PageViewSessionizerApp -task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/remote-table-join-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/remote-table-join-example.properties b/src/main/config/remote-table-join-example.properties new file mode 100644 index 0000000..62c7580 --- /dev/null +++ b/src/main/config/remote-table-join-example.properties @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=stock-price-table-joiner +job.container.count=1 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.RemoteTableJoinExample http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/session-window-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/session-window-example.properties b/src/main/config/session-window-example.properties new file mode 100644 index 0000000..407cb25 --- /dev/null +++ b/src/main/config/session-window-example.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-sessionizer +job.container.count=2 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.SessionWindowExample +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/stock-price-table-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/stock-price-table-joiner.properties b/src/main/config/stock-price-table-joiner.properties deleted file mode 100644 index 410cdd4..0000000 --- a/src/main/config/stock-price-table-joiner.properties +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=stock-price-table-joiner -job.container.count=1 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.StockPriceTableJoiner http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/stream-table-join-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/stream-table-join-example.properties b/src/main/config/stream-table-join-example.properties new file mode 100644 index 0000000..8108df1 --- /dev/null +++ b/src/main/config/stream-table-join-example.properties @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=pageview-profile-table-joiner +job.container.count=2 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.StreamTableJoinExample http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/tumbling-pageview-counter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties deleted file mode 100644 index 70ca290..0000000 --- a/src/main/config/tumbling-pageview-counter.properties +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=org.apache.samza.job.yarn.YarnJobFactory -job.name=tumbling-pageview-counter -job.container.count=2 - -# YARN -yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz - -# Task -app.class=samza.examples.cookbook.TumblingPageViewCounterApp -task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/config/tumbling-window-example.properties ---------------------------------------------------------------------- diff --git a/src/main/config/tumbling-window-example.properties b/src/main/config/tumbling-window-example.properties new file mode 100644 index 0000000..992185c --- /dev/null +++ b/src/main/config/tumbling-window-example.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=tumbling-pageview-counter +job.container.count=2 + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +app.class=samza.examples.cookbook.TumblingWindowExample +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/FilterExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/FilterExample.java b/src/main/java/samza/examples/cookbook/FilterExample.java new file mode 100644 index 0000000..bcf5b18 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/FilterExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import samza.examples.cookbook.data.PageView; + +import java.util.List; +import java.util.Map; + +/** + * In this example, we demonstrate filtering out some bad events in the stream. + * + * <p>Concepts covered: Using stateless operators on a stream. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "pageview-filter-input" is created <br/> + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/filter-example.properties + * </li> + * <li> + * Produce some messages to the "pageview-filter-input" topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> + * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} + * </li> + * <li> + * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true + * </li> + * </ol> + */ +public class FilterExample implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + private static final String INPUT_STREAM_ID = "pageview-filter-input"; + private static final String OUTPUT_STREAM_ID = "pageview-filter-output"; + private static final String INVALID_USER_ID = "invalidUserId"; + + @Override + public void describe(StreamApplicationDescriptor appDescriptor) { + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)); + KafkaInputDescriptor<KV<String, PageView>> inputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde); + KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor); + OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor); + + pageViews + .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId)) + .sendTo(filteredPageViews); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/JoinExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/JoinExample.java b/src/main/java/samza/examples/cookbook/JoinExample.java new file mode 100644 index 0000000..05a358d --- /dev/null +++ b/src/main/java/samza/examples/cookbook/JoinExample.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import java.io.Serializable; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import samza.examples.cookbook.data.AdClick; +import samza.examples.cookbook.data.PageView; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +/** + * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for + * analysis on what pages served an Ad that was clicked. + * + * <p> Concepts covered: Performing stream to stream Joins. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/> + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/join-example.properties + * </li> + * <li> + * Produce some messages to the "pageview-join-input" topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} + * </li> + * <li> + * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/> + * {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/> + * {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"} + * </li> + * <li> + * Consume messages from the "pageview-adclick-join-output" topic <br/> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true + * </li> + * </ol> + * + */ +public class JoinExample implements StreamApplication, Serializable { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + private static final String PAGEVIEW_STREAM_ID = "pageview-join-input"; + private static final String ADCLICK_STREAM_ID = "adclick-join-input"; + private static final String OUTPUT_STREAM_ID = "pageview-adclick-join-output"; + + @Override + public void describe(StreamApplicationDescriptor appDescriptor) { + StringSerde stringSerde = new StringSerde(); + JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); + JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class); + JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class); + + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<PageView> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(PAGEVIEW_STREAM_ID, pageViewSerde); + KafkaInputDescriptor<AdClick> adClickInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(ADCLICK_STREAM_ID, adClickSerde); + KafkaOutputDescriptor<JoinResult> joinResultOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, joinResultSerde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + MessageStream<PageView> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); + MessageStream<AdClick> adClicks = appDescriptor.getInputStream(adClickInputDescriptor); + OutputStream<JoinResult> joinResults = appDescriptor.getOutputStream(joinResultOutputDescriptor); + + JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction = + new JoinFunction<String, PageView, AdClick, JoinResult>() { + @Override + public JoinResult apply(PageView pageView, AdClick adClick) { + return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId()); + } + + @Override + public String getFirstKey(PageView pageView) { + return pageView.pageId; + } + + @Override + public String getSecondKey(AdClick adClick) { + return adClick.getPageId(); + } + }; + + MessageStream<PageView> repartitionedPageViews = + pageViews + .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde), "pageview") + .map(KV::getValue); + + MessageStream<AdClick> repartitionedAdClicks = + adClicks + .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde), "adclick") + .map(KV::getValue); + + repartitionedPageViews + .join(repartitionedAdClicks, pageViewAdClickJoinFunction, + stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3), "join") + .sendTo(joinResults); + } + + static class JoinResult { + public String pageId; + public String userId; + public String country; + public String adId; + + public JoinResult(String pageId, String userId, String country, String adId) { + this.pageId = pageId; + this.userId = userId; + this.country = country; + this.adId = adId; + } + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java deleted file mode 100644 index 4c5d86b..0000000 --- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import samza.examples.cookbook.data.AdClick; -import samza.examples.cookbook.data.PageView; - -import java.time.Duration; -import java.util.List; -import java.util.Map; - -/** - * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for - * analysis on what pages served an Ad that was clicked. - * - * <p> Concepts covered: Performing stream to stream Joins. - * - * To run the below example: - * - * <ol> - * <li> - * Ensure that the topics "pageview-join-input", "adclick-join-input" are created <br/> - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic adclick-join-input --partitions 2 --replication-factor 1 - * </li> - * <li> - * Run the application using the run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-adclick-joiner.properties - * </li> - * <li> - * Produce some messages to the "pageview-join-input" topic <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} - * </li> - * <li> - * Produce some messages to the "adclick-join-input" topic with the same pageKey <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic adclick-join-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "adId": "adClickId1", "pageId":"google.com"} <br/> - * {"userId": "user1", "adId": "adClickId2", "pageId":"yahoo.com"} - * </li> - * <li> - * Consume messages from the "pageview-adclick-join-output" topic <br/> - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-adclick-join-output --property print.key=true - * </li> - * </ol> - * - */ -public class PageViewAdClickJoiner implements StreamApplication { - private static final String KAFKA_SYSTEM_NAME = "kafka"; - private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); - private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); - private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - - private static final String PAGEVIEW_STREAM_ID = "pageview-join-input"; - private static final String ADCLICK_STREAM_ID = "adclick-join-input"; - private static final String OUTPUT_STREAM_ID = "pageview-adclick-join-output"; - - @Override - public void describe(StreamApplicationDescriptor appDescriptor) { - StringSerde stringSerde = new StringSerde(); - JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); - JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class); - JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class); - - KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) - .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) - .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) - .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); - - KafkaInputDescriptor<PageView> pageViewInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(PAGEVIEW_STREAM_ID, pageViewSerde); - KafkaInputDescriptor<AdClick> adClickInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(ADCLICK_STREAM_ID, adClickSerde); - KafkaOutputDescriptor<JoinResult> joinResultOutputDescriptor = - kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, joinResultSerde); - - appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - - MessageStream<PageView> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); - MessageStream<AdClick> adClicks = appDescriptor.getInputStream(adClickInputDescriptor); - OutputStream<JoinResult> joinResults = appDescriptor.getOutputStream(joinResultOutputDescriptor); - - JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction = - new JoinFunction<String, PageView, AdClick, JoinResult>() { - @Override - public JoinResult apply(PageView pageView, AdClick adClick) { - return new JoinResult(pageView.pageId, pageView.userId, pageView.country, adClick.getAdId()); - } - - @Override - public String getFirstKey(PageView pageView) { - return pageView.pageId; - } - - @Override - public String getSecondKey(AdClick adClick) { - return adClick.getPageId(); - } - }; - - MessageStream<PageView> repartitionedPageViews = - pageViews - .partitionBy(pv -> pv.pageId, pv -> pv, KVSerde.of(stringSerde, pageViewSerde), "pageview") - .map(KV::getValue); - - MessageStream<AdClick> repartitionedAdClicks = - adClicks - .partitionBy(AdClick::getPageId, ac -> ac, KVSerde.of(stringSerde, adClickSerde), "adclick") - .map(KV::getValue); - - repartitionedPageViews - .join(repartitionedAdClicks, pageViewAdClickJoinFunction, - stringSerde, pageViewSerde, adClickSerde, Duration.ofMinutes(3), "join") - .sendTo(joinResults); - } - - static class JoinResult { - public String pageId; - public String userId; - public String country; - public String adId; - - public JoinResult(String pageId, String userId, String country, String adId) { - this.pageId = pageId; - this.userId = userId; - this.country = country; - this.adId = adId; - } - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/PageViewFilterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java deleted file mode 100644 index e131a8f..0000000 --- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import samza.examples.cookbook.data.PageView; - -import java.util.List; -import java.util.Map; - -/** - * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream. - * - * <p>Concepts covered: Using stateless operators on a stream, Re-partitioning a stream. - * - * To run the below example: - * - * <ol> - * <li> - * Ensure that the topic "pageview-filter-input" is created <br/> - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1 - * </li> - * <li> - * Run the application using the run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-filter.properties - * </li> - * <li> - * Produce some messages to the "pageview-filter-input" topic <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> - * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} - * </li> - * <li> - * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true - * </li> - * </ol> - */ -public class PageViewFilterApp implements StreamApplication { - private static final String KAFKA_SYSTEM_NAME = "kafka"; - private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); - private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); - private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - - private static final String INPUT_STREAM_ID = "pageview-filter-input"; - private static final String OUTPUT_STREAM_ID = "pageview-filter-output"; - private static final String INVALID_USER_ID = "invalidUserId"; - - @Override - public void describe(StreamApplicationDescriptor appDescriptor) { - KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) - .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) - .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) - .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); - - KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)); - KafkaInputDescriptor<KV<String, PageView>> inputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde); - KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor = - kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); - - appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - - MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor); - OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor); - - pageViews - .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview") - .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId)) - .sendTo(filteredPageViews); - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java deleted file mode 100644 index f67e9c1..0000000 --- a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.storage.kv.RocksDbTableDescriptor; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; -import org.apache.samza.table.Table; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import samza.examples.cookbook.data.PageView; -import samza.examples.cookbook.data.Profile; - -import java.util.List; -import java.util.Map; - -/** - * In this example, we join a stream of Page views with a table of user profiles, which is populated from an - * user profile stream. For instance, this is helpful for analysis that required additional information from - * user's profile. - * - * <p> Concepts covered: Performing stream-to-table joins. - * - * To run the below example: - * - * <ol> - * <li> - * Ensure that the topics "pageview-join-input", "profile-table-input" are created <br/> - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1 - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic profile-table-input --partitions 2 --replication-factor 1 - * </li> - * <li> - * Run the application using the run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-profile-table-joiner.properties - * </li> - * <li> - * Produce some messages to the "profile-table-input" topic with the same userId <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic profile-table-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "company": "LNKD"} <br/> - * {"userId": "user2", "company": "MSFT"} - * </li> - * <li> - * Produce some messages to the "pageview-join-input" topic <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com"} - * </li> - * <li> - * Consume messages from the "enriched-pageview-join-output" topic <br/> - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched-pageview-join-output - * </li> - * </ol> - * - */ -public class PageViewProfileTableJoiner implements StreamApplication { - private static final String KAFKA_SYSTEM_NAME = "kafka"; - private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); - private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); - private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - - private static final String PROFILE_STREAM_ID = "profile-table-input"; - private static final String PAGEVIEW_STREAM_ID = "pageview-join-input"; - private static final String OUTPUT_TOPIC = "enriched-pageview-join-output"; - - @Override - public void describe(StreamApplicationDescriptor appDescriptor) { - Serde<Profile> profileSerde = new JsonSerdeV2<>(Profile.class); - Serde<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); - Serde<EnrichedPageView> joinResultSerde = new JsonSerdeV2<>(EnrichedPageView.class); - - KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) - .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) - .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) - .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); - - KafkaInputDescriptor<Profile> profileInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(PROFILE_STREAM_ID, profileSerde); - KafkaInputDescriptor<PageView> pageViewInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(PAGEVIEW_STREAM_ID, pageViewSerde); - KafkaOutputDescriptor<EnrichedPageView> joinResultOutputDescriptor = - kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_TOPIC, joinResultSerde); - - RocksDbTableDescriptor<String, Profile> profileTableDescriptor = - new RocksDbTableDescriptor<String, Profile>("profile-table", KVSerde.of(new StringSerde(), profileSerde)); - - appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - - MessageStream<Profile> profileStream = appDescriptor.getInputStream(profileInputDescriptor); - MessageStream<PageView> pageViewStream = appDescriptor.getInputStream(pageViewInputDescriptor); - OutputStream<EnrichedPageView> joinResultStream = appDescriptor.getOutputStream(joinResultOutputDescriptor); - Table<KV<String, Profile>> profileTable = appDescriptor.getTable(profileTableDescriptor); - - profileStream - .map(profile -> KV.of(profile.userId, profile)) - .sendTo(profileTable); - - pageViewStream - .partitionBy(pv -> pv.userId, pv -> pv, KVSerde.of(new StringSerde(), pageViewSerde), "join") - .join(profileTable, new JoinFn()) - .sendTo(joinResultStream); - } - - private class JoinFn implements StreamTableJoinFunction<String, KV<String, PageView>, KV<String, Profile>, EnrichedPageView> { - @Override - public EnrichedPageView apply(KV<String, PageView> message, KV<String, Profile> record) { - return record == null ? null : - new EnrichedPageView(message.getKey(), record.getValue().company, message.getValue().pageId); - } - @Override - public String getMessageKey(KV<String, PageView> message) { - return message.getKey(); - } - @Override - public String getRecordKey(KV<String, Profile> record) { - return record.getKey(); - } - } - - static public class EnrichedPageView { - - public final String userId; - public final String company; - public final String pageId; - - public EnrichedPageView(String userId, String company, String pageId) { - this.userId = userId; - this.company = company; - this.pageId = pageId; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java deleted file mode 100644 index fb17974..0000000 --- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import samza.examples.cookbook.data.PageView; -import samza.examples.cookbook.data.UserPageViews; - -import java.time.Duration; -import java.util.List; -import java.util.Map; - -/** - * In this example, we group page views by userId into sessions, and compute the number of page views for each user - * session. A session is considered closed when there is no user activity for a 10 second duration. - * - * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream. - * - * To run the below example: - * - * <ol> - * <li> - * Ensure that the topic "pageview-session-input" is created <br/> - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1 - * </li> - * <li> - * Run the application using the run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-sessionizer.properties - * </li> - * <li> - * Produce some messages to the "pageview-session-input" topic <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/> - * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/> - * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"} - * </li> - * <li> - * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh) - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true - * </li> - * </ol> - * - */ -public class PageViewSessionizerApp implements StreamApplication { - private static final String KAFKA_SYSTEM_NAME = "kafka"; - private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); - private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); - private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - - private static final String INPUT_STREAM_ID = "pageview-session-input"; - private static final String OUTPUT_STREAM_ID = "pageview-session-output"; - - @Override - public void describe(StreamApplicationDescriptor appDescriptor) { - Serde<String> stringSerde = new StringSerde(); - Serde<KV<String, PageView>> pageViewKVSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(PageView.class)); - Serde<KV<String, UserPageViews>> userPageViewSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(UserPageViews.class)); - - KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) - .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) - .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) - .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); - - KafkaInputDescriptor<KV<String, PageView>> pageViewInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, pageViewKVSerde); - KafkaOutputDescriptor<KV<String, UserPageViews>> userPageViewsOutputDescriptor = - kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, userPageViewSerde); - - appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - - MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); - OutputStream<KV<String, UserPageViews>> userPageViews = appDescriptor.getOutputStream(userPageViewsOutputDescriptor); - - pageViews - .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview") - .window(Windows.keyedSessionWindow(kv -> kv.value.userId, - Duration.ofSeconds(10), stringSerde, pageViewKVSerde), "usersession") - .map(windowPane -> { - String userId = windowPane.getKey().getKey(); - int views = windowPane.getMessage().size(); - return KV.of(userId, new UserPageViews(userId, views)); - }) - .sendTo(userPageViews); - } -} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java new file mode 100644 index 0000000..386cdda --- /dev/null +++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.io.Serializable; +import java.net.URL; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.table.Table; +import org.apache.samza.table.caching.CachingTableDescriptor; +import org.apache.samza.table.remote.RemoteTableDescriptor; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.util.ExponentialSleepStrategy; +import org.apache.samza.util.HttpUtil; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * In this example, we join a stream of stock symbols with a remote table backed by a RESTful service, + * which delivers latest stock quotes. The join results contain stock symbol and latest price, and are + * delivered to an output stream. + * + * A rate limit of 10 requests/second is set of the entire job, internally Samza uses an embedded + * rate limiter, which evenly distributes the total rate limit among tasks. + * + * A caching table is used over the remote table with a read TTL of 5 seconds, therefore one would + * receive the same quote with this time span. + * + * <p> Concepts covered: remote table, rate limiter, caching table, stream to table joins. + * + * To run the below example: + * + * <ol> + * <li> + * Create Kafka topics "stock-symbol-input", "stock-price-output" are created <br/> + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-symbol-input --partitions 2 --replication-factor 1 + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-price-output --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/remote-table-join-example.properties + * </li> + * <li> + * Consume messages from the output topic <br/> + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stock-price-output + * </li> + * <li> + * Produce some messages to the input topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic stock-symbol-input --broker-list localhost:9092 + * + * After the console producer is started, type + * MSFT + * + * You should see messages like below from the console consumer window + * {"symbol":"MSFT","close":107.64} + * + * Note: you will need a free API key for symbols other than MSFT, see below for more information. + * </li> + * </ol> + * + */ +public class RemoteTableJoinExample implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + /** + * Default API key "demo" only works for symbol "MSFT"; however you can get an + * API key for free at https://www.alphavantage.co/, which will work for other symbols. + */ + private static final String API_KEY = "demo"; + + private static final String URL_TEMPLATE = + "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=" + API_KEY; + + private static final String INPUT_STREAM_ID = "stock-symbol-input"; + private static final String OUTPUT_STREAM_ID = "stock-price-output"; + + @Override + public void describe(StreamApplicationDescriptor appDescriptor) { + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<String> stockSymbolInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde()); + KafkaOutputDescriptor<StockPrice> stockPriceOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new JsonSerdeV2<>(StockPrice.class)); + MessageStream<String> stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor); + OutputStream<StockPrice> stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor); + + RemoteTableDescriptor<String, Double> remoteTableDescriptor = + new RemoteTableDescriptor("remote-table") + .withReadRateLimit(10) + .withReadFunction(new StockPriceReadFunction()); + CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor = + new CachingTableDescriptor<String, Double>("cached-remote-table") + .withTable(remoteTableDescriptor) + .withReadTtl(Duration.ofSeconds(5)); + Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor); + + stockSymbolStream + .map(symbol -> new KV<String, Void>(symbol, null)) + .join(cachedRemoteTable, new JoinFn()) + .sendTo(stockPriceStream); + + } + + static class JoinFn implements StreamTableJoinFunction<String, KV<String, Void>, KV<String, Double>, StockPrice> { + @Override + public StockPrice apply(KV<String, Void> message, KV<String, Double> record) { + return record == null ? null : new StockPrice(message.getKey(), record.getValue()); + } + @Override + public String getMessageKey(KV<String, Void> message) { + return message.getKey(); + } + @Override + public String getRecordKey(KV<String, Double> record) { + return record.getKey(); + } + } + + static class StockPriceReadFunction implements TableReadFunction<String, Double> { + @Override + public CompletableFuture<Double> getAsync(String symbol) { + return CompletableFuture.supplyAsync(() -> { + try { + URL url = new URL(String.format(URL_TEMPLATE, symbol)); + String response = HttpUtil.read(url, 5000, new ExponentialSleepStrategy()); + JsonParser parser = new JsonFactory().createJsonParser(response); + while (!parser.isClosed()) { + if (JsonToken.FIELD_NAME.equals(parser.nextToken()) && "4. close".equalsIgnoreCase(parser.getCurrentName())) { + return Double.valueOf(parser.nextTextValue()); + } + } + return -1d; + } catch (Exception ex) { + throw new SamzaException(ex); + } + }); + } + + @Override + public boolean isRetriable(Throwable throwable) { + return false; + } + } + + static class StockPrice implements Serializable { + + public final String symbol; + public final Double close; + + public StockPrice( + @JsonProperty("symbol") String symbol, + @JsonProperty("close") Double close) { + this.symbol = symbol; + this.close = close; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/SessionWindowExample.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/SessionWindowExample.java b/src/main/java/samza/examples/cookbook/SessionWindowExample.java new file mode 100644 index 0000000..bfdf188 --- /dev/null +++ b/src/main/java/samza/examples/cookbook/SessionWindowExample.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package samza.examples.cookbook; + +import java.io.Serializable; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import samza.examples.cookbook.data.PageView; +import samza.examples.cookbook.data.UserPageViews; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +/** + * In this example, we group page views by userId into sessions, and compute the number of page views for each user + * session. A session is considered closed when there is no user activity for a 10 second duration. + * + * <p>Concepts covered: Using session windows to group data in a stream, Re-partitioning a stream. + * + * To run the below example: + * + * <ol> + * <li> + * Ensure that the topic "pageview-session-input" is created <br/> + * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-session-input --partitions 2 --replication-factor 1 + * </li> + * <li> + * Run the application using the run-app.sh script <br/> + * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/session-window-example.properties + * </li> + * <li> + * Produce some messages to the "pageview-session-input" topic <br/> + * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-session-input --broker-list localhost:9092 <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/home"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/search"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/home"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/sports"} <br/> + * {"userId": "user1", "country": "india", "pageId":"google.com/news"} <br/> + * {"userId": "user2", "country": "china", "pageId":"yahoo.com/fashion"} + * </li> + * <li> + * Consume messages from the "pageview-session-output" topic (e.g. bin/kafka-console-consumer.sh) + * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-session-output --property print.key=true + * </li> + * </ol> + * + */ +public class SessionWindowExample implements StreamApplication, Serializable { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + private static final String INPUT_STREAM_ID = "pageview-session-input"; + private static final String OUTPUT_STREAM_ID = "pageview-session-output"; + + @Override + public void describe(StreamApplicationDescriptor appDescriptor) { + Serde<String> stringSerde = new StringSerde(); + KVSerde<String, PageView> pageViewKVSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(PageView.class)); + KVSerde<String, UserPageViews> userPageViewSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(UserPageViews.class)); + + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<KV<String, PageView>> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, pageViewKVSerde); + KafkaOutputDescriptor<KV<String, UserPageViews>> userPageViewsOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, userPageViewSerde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); + OutputStream<KV<String, UserPageViews>> userPageViews = appDescriptor.getOutputStream(userPageViewsOutputDescriptor); + + pageViews + .partitionBy(kv -> kv.value.userId, kv -> kv.value, pageViewKVSerde, "pageview") + .window(Windows.keyedSessionWindow(kv -> kv.value.userId, + Duration.ofSeconds(10), stringSerde, pageViewKVSerde), "usersession") + .map(windowPane -> { + String userId = windowPane.getKey().getKey(); + int views = windowPane.getMessage().size(); + return KV.of(userId, new UserPageViews(userId, views)); + }) + .sendTo(userPageViews); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f9374176/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java deleted file mode 100644 index 3aa951e..0000000 --- a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package samza.examples.cookbook; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import java.io.Serializable; -import java.net.URL; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaOutputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; -import org.apache.samza.table.Table; -import org.apache.samza.table.caching.CachingTableDescriptor; -import org.apache.samza.table.remote.RemoteTableDescriptor; -import org.apache.samza.table.remote.TableReadFunction; -import org.apache.samza.util.ExponentialSleepStrategy; -import org.apache.samza.util.HttpUtil; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; -import org.codehaus.jackson.annotate.JsonProperty; - -/** - * In this example, we join a stream of stock symbols with a remote table backed by a RESTful service, - * which delivers latest stock quotes. The join results contain stock symbol and latest price, and are - * delivered to an output stream. - * - * A rate limit of 10 requests/second is set of the entire job, internally Samza uses an embedded - * rate limiter, which evenly distributes the total rate limit among tasks. - * - * A caching table is used over the remote table with a read TTL of 5 seconds, therefore one would - * receive the same quote with this time span. - * - * <p> Concepts covered: remote table, rate limiter, caching table, stream to table joins. - * - * To run the below example: - * - * <ol> - * <li> - * Create Kafka topics "stock-symbol-input", "stock-price-output" are created <br/> - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-symbol-input --partitions 2 --replication-factor 1 - * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-price-output --partitions 2 --replication-factor 1 - * </li> - * <li> - * Run the application using the run-app.sh script <br/> - * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/stock-price-table-joiner.properties - * </li> - * <li> - * Consume messages from the output topic <br/> - * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stock-price-output - * </li> - * <li> - * Produce some messages to the input topic <br/> - * ./deploy/kafka/bin/kafka-console-producer.sh --topic stock-symbol-input --broker-list localhost:9092 - * - * After the console producer is started, type - * MSFT - * - * You should see messages like below from the console consumer window - * {"symbol":"MSFT","close":107.64} - * - * Note: you will need a free API key for symbols other than MSFT, see below for more information. - * </li> - * </ol> - * - */ -public class StockPriceTableJoiner implements StreamApplication { - private static final String KAFKA_SYSTEM_NAME = "kafka"; - private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); - private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); - private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - - /** - * Default API key "demo" only works for symbol "MSFT"; however you can get an - * API key for free at https://www.alphavantage.co/, which will work for other symbols. - */ - private static final String API_KEY = "demo"; - - private static final String URL_TEMPLATE = - "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=" + API_KEY; - - private static final String INPUT_STREAM_ID = "stock-symbol-input"; - private static final String OUTPUT_STREAM_ID = "stock-price-output"; - - @Override - public void describe(StreamApplicationDescriptor appDescriptor) { - KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) - .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) - .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) - .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); - - KafkaInputDescriptor<String> stockSymbolInputDescriptor = - kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde()); - KafkaOutputDescriptor<StockPrice> stockPriceOutputDescriptor = - kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new JsonSerdeV2<>(StockPrice.class)); - MessageStream<String> stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor); - OutputStream<StockPrice> stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor); - - RemoteTableDescriptor<String, Double> remoteTableDescriptor = - new RemoteTableDescriptor("remote-table") - .withReadRateLimit(10) - .withReadFunction(new StockPriceReadFunction()); - CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor = - new CachingTableDescriptor<String, Double>("cached-remote-table") - .withTable(remoteTableDescriptor) - .withReadTtl(Duration.ofSeconds(5)); - Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor); - - stockSymbolStream - .map(symbol -> new KV<String, Void>(symbol, null)) - .join(cachedRemoteTable, new JoinFn()) - .sendTo(stockPriceStream); - - } - - static class JoinFn implements StreamTableJoinFunction<String, KV<String, Void>, KV<String, Double>, StockPrice> { - @Override - public StockPrice apply(KV<String, Void> message, KV<String, Double> record) { - return record == null ? null : new StockPrice(message.getKey(), record.getValue()); - } - @Override - public String getMessageKey(KV<String, Void> message) { - return message.getKey(); - } - @Override - public String getRecordKey(KV<String, Double> record) { - return record.getKey(); - } - } - - static class StockPriceReadFunction implements TableReadFunction<String, Double> { - @Override - public CompletableFuture<Double> getAsync(String symbol) { - return CompletableFuture.supplyAsync(() -> { - try { - URL url = new URL(String.format(URL_TEMPLATE, symbol)); - String response = HttpUtil.read(url, 5000, new ExponentialSleepStrategy()); - JsonParser parser = new JsonFactory().createJsonParser(response); - while (!parser.isClosed()) { - if (JsonToken.FIELD_NAME.equals(parser.nextToken()) && "4. close".equalsIgnoreCase(parser.getCurrentName())) { - return Double.valueOf(parser.nextTextValue()); - } - } - return -1d; - } catch (Exception ex) { - throw new SamzaException(ex); - } - }); - } - - @Override - public boolean isRetriable(Throwable throwable) { - return false; - } - } - - static class StockPrice implements Serializable { - - public final String symbol; - public final Double close; - - public StockPrice( - @JsonProperty("symbol") String symbol, - @JsonProperty("close") Double close) { - this.symbol = symbol; - this.close = close; - } - } - -}