Update Event Hubs Example Update the Event Hubs Connector example for 1.0 release.
Author: Daniel Chen <dch...@linkedin.com> Reviewers: Prateek Maheshwari Closes #38 from dxichen/latest Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/a096c753 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/a096c753 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/a096c753 Branch: refs/heads/master Commit: a096c7533987aa34128d4c61d6ad5b2da05b8039 Parents: 4a73e0d Author: Daniel Chen <dch...@linkedin.com> Authored: Tue Oct 23 12:15:58 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Tue Oct 23 12:15:58 2018 -0700 ---------------------------------------------------------------------- bin/run-azure-application.sh | 30 ----------- bin/run-event-hubs-zk-application.sh | 30 +++++++++++ src/main/assembly/src.xml | 2 +- .../azure-application-local-runner.properties | 26 ++-------- .../samza/examples/azure/AzureApplication.java | 53 ++++++++++++++------ 5 files changed, 71 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-azure-application.sh ---------------------------------------------------------------------- diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh deleted file mode 100755 index 8cd2463..0000000 --- a/bin/run-azure-application.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -home_dir=`pwd` -base_dir=$(dirname $0)/.. -cd $base_dir -base_dir=`pwd` -cd $home_dir - -export EXECUTION_PLAN_DIR="$base_dir/plan" -mkdir -p $EXECUTION_PLAN_DIR - -[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" - -exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-event-hubs-zk-application.sh ---------------------------------------------------------------------- diff --git a/bin/run-event-hubs-zk-application.sh b/bin/run-event-hubs-zk-application.sh new file mode 100755 index 0000000..8cd2463 --- /dev/null +++ b/bin/run-event-hubs-zk-application.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +home_dir=`pwd` +base_dir=$(dirname $0)/.. +cd $base_dir +base_dir=`pwd` +cd $home_dir + +export EXECUTION_PLAN_DIR="$base_dir/plan" +mkdir -p $EXECUTION_PLAN_DIR + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index 8f3694e..1614aaf 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -49,7 +49,7 @@ <outputDirectory>bin</outputDirectory> </file> <file> - <source>${basedir}/bin/run-azure-application.sh</source> + <source>${basedir}/bin/run-event-hubs-zk-application.sh</source> <outputDirectory>bin</outputDirectory> </file> </files> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/config/azure-application-local-runner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties index e440fd8..3b7618e 100644 --- a/src/main/config/azure-application-local-runner.properties +++ b/src/main/config/azure-application-local-runner.properties @@ -21,29 +21,9 @@ job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory job.default.system=eventhubs job.coordinator.zk.connect=localhost:2181 -# Azure EventHubs System -systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory -systems.eventhubs.stream.list=output-stream,input-stream - -# Add your EventHubs input stream credentials here -systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE -systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME -systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME -systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN - -# Add your EventHubs output stream credentials here -systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE -systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME -systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME -systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN - -# Azure Table Checkpoint Manager -task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory -azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING +# Define the key and name configurations with property names of your choice, starting with 'sensitive.' +sensitive.eventhubs.sas.key.name=my-sas-key-name +sensitive.eventhubs.sas.token=my-sas-token # Task/Application task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory - -# Streams -streams.input-stream.samza.system=eventhubs -streams.output-stream.samza.system=eventhubs http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/java/samza/examples/azure/AzureApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java index e2c337f..454787f 100644 --- a/src/main/java/samza/examples/azure/AzureApplication.java +++ b/src/main/java/samza/examples/azure/AzureApplication.java @@ -24,39 +24,60 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.serializers.ByteSerde; -import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.descriptors.GenericInputDescriptor; -import org.apache.samza.system.descriptors.GenericOutputDescriptor; -import org.apache.samza.system.descriptors.GenericSystemDescriptor; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.eventhub.descriptors.EventHubsInputDescriptor; +import org.apache.samza.system.eventhub.descriptors.EventHubsOutputDescriptor; +import org.apache.samza.system.eventhub.descriptors.EventHubsSystemDescriptor; + public class AzureApplication implements StreamApplication { + // Stream names private static final String INPUT_STREAM_ID = "input-stream"; private static final String OUTPUT_STREAM_ID = "output-stream"; + // These properties could be configured here or in azure-application-local-runner.properties + // Keep in mind that the .properties file will be overwrite properties defined here with Descriptors + private static final String EVENTHUBS_NAMESPACE = "my-eventhubs-namespace"; + + // Upstream and downstream Event Hubs entity names + private static final String EVENTHUBS_INPUT_ENTITY = "my-input-entity"; + private static final String EVENTHUBS_OUTPUT_ENTITY = "my-output-entity"; + + // You may define your own config properties in azure-application-local-runner.properties and retrieve them + // in the StreamApplicationDescriptor. Prefix them with 'sensitive.' to avoid logging them. + private static final String EVENTHUBS_SAS_KEY_NAME_CONFIG = "sensitive.eventhubs.sas.key.name"; + private static final String EVENTHUBS_SAS_KEY_TOKEN_CONFIG = "sensitive.eventhubs.sas.token"; + @Override public void describe(StreamApplicationDescriptor appDescriptor) { - GenericSystemDescriptor systemDescriptor = - new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory"); + // Define your system here + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs"); - KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde()); + // Choose your serializer/deserializer for the EventData payload + StringSerde serde = new StringSerde(); - GenericInputDescriptor<KV<String, byte[]>> inputDescriptor = - systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde); + // Define the input and output descriptors with respective configs + EventHubsInputDescriptor<KV<String, String>> inputDescriptor = + systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde) + .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG)) + .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG)); - GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor = - systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); + EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = + systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde) + .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG)) + .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG)); - MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); - OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); + // Define the input and output streams with descriptors + MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); + OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); + // Define the execution flow with the high-level API eventhubInput - .filter((message) -> message.getKey() != null) .map((message) -> { System.out.println("Sending: "); System.out.println("Received Key: " + message.getKey()); - System.out.println("Received Message: " + new String(message.getValue())); + System.out.println("Received Message: " + message.getValue()); return message; }) .sendTo(eventhubOutput);