Cleanup the EventHubs connector section. Use System/Stream Descriptors Author: Jagadish <jvenkatra...@linkedin.com>
Reviewers: Jagadish<jagad...@apache.org> Closes #780 from vjagadish1989/website-reorg27 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/299c0317 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/299c0317 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/299c0317 Branch: refs/heads/1.0.0 Commit: 299c031795eaaaa3cf255e02d5fc636a43570e7d Parents: e4b60e7 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Mon Oct 29 16:31:11 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Nov 13 19:32:22 2018 -0800 ---------------------------------------------------------------------- .../versioned/connectors/eventhubs.md | 209 +++++++------------ .../documentation/versioned/connectors/kafka.md | 7 +- .../documentation/versioned/jobs/logging.md | 2 + 3 files changed, 75 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/connectors/eventhubs.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/eventhubs.md b/docs/learn/documentation/versioned/connectors/eventhubs.md index 16120f3..0be288b 100644 --- a/docs/learn/documentation/versioned/connectors/eventhubs.md +++ b/docs/learn/documentation/versioned/connectors/eventhubs.md @@ -19,135 +19,59 @@ title: Event Hubs Connector limitations under the License. --> -## Overview +## EventHubs I/O: QuickStart -The Samza Event Hubs connector provides access to [Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoftâs data streaming service on Azure. An event hub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). +The Samza EventHubs connector provides access to [Azure EventHubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoftâs data streaming service on Azure. An eventhub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). -You may find an [example](../../../tutorials/versioned/samza-event-hubs-standalone.md) using this connector in the [hello-samza](https://github.com/apache/samza-hello-samza) project. +The [hello-samza](https://github.com/apache/samza-hello-samza) project includes an [example](../../../tutorials/versioned/samza-event-hubs-standalone.md) of reading and writing to EventHubs. -## Consuming from Event Hubs +### Concepts -Samzaâs [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). Samza's Event Hubs consumer wraps each message from Event Hubs into an EventHubMessageEnvelope. The envelope has two fields of interest - the key, which is set to the event's String partition key and the message, which is set to the actual data in the event. - -You can describe your Samza jobs to process data from Azure Event Hubs. To set Samza to consume from Event Hubs streams: +####EventHubsSystemDescriptor +Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `EventHubsSystemDescriptor` allows you to configure various properties for the `EventHubsClient` used by Samza. {% highlight java %} - 1 public void describe(StreamApplicationDescriptor appDescriptor) { - 2 // Define your system here - 3 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs"); - 4 - 5 // Choose your serializer/deserializer for the EventData payload - 6 StringSerde serde = new StringSerde(); - 7 - 8 // Define the input descriptors with respective descriptors - 9 EventHubsInputDescriptor<KV<String, String>> inputDescriptor = -10 systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde) -11 .withSasKeyName(EVENTHUBS_SAS_KEY_NAME) -12 .withSasKey(EVENTHUBS_SAS_KEY_TOKEN); -13 -14 // Define the input streams with descriptors -15 MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); -16 -17 //... -18 } -{% endhighlight %} - -In the code snippet above, we create the input and output streams that can consume and produce from the configured Event Hubs entities. - -1. Line 3: A `EventHubsSystemDescriptor` is created with the name "eventhubs". You may set different system descriptors here. -2. Line 6: Event Hubs messages are consumed as key value pairs. The [serde](../../documentation/versioned/container/serialization.html) is defined for the value of the incoming payload of the Event Hubs' EventData. You may use any of the serdes that samza ships with out of the box or define your own. -The serde for the key is not set since it will always the String from the EventData [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__). -3. Line 8-12: An `EventHubsInputDescriptor` is created with the required descriptors to gain access of the Event Hubs entity (`STREAM_ID`, `EVENTHUBS_NAMESPACE`, `EVENTHUBS_ENTITY`, `EVENTHUBS_SAS_KEY_NAME`, `EVENTHUBS_SAS_KEY_TOKEN`). -These must be set to the credentials of the entities you wish to connect to. -4. Line 15: creates an `InputStream` with the previously defined `EventHubsInputDescriptor`. - -Alternatively, you can set these properties in the `.properties` file of the application. -Note: the keys set in the `.properties` file will override the ones set in code with descriptors. -Refer to the [Event Hubs configuration reference](../../documentation/versioned/jobs/samza-configurations.html#eventhubs) for the complete list of configurations. - -{% highlight jproperties %} -# define an event hub system factory with your identifier. eg: eh-system -systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory - -# define your streams -systems.eh-system.stream.list=eh-input-stream -streams.eh-stream.samza.system=eh-system - -# define required properties for your streams -streams.eh-input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE -streams.eh-input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME -streams.eh-input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME -streams.eh-input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN + 1 EventHubsSystemDescriptor eventHubsSystemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5); {% endhighlight %} -It is required to provide values for YOUR-STREAM-NAMESPACE, YOUR-ENTITY-NAME, YOUR-SAS-KEY-NAME, YOUR-SAS-KEY-TOKEN to read or write to the stream. - -## Producing to Event Hubs +####EventHubsInputDescriptor -Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) from Samza is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the message in the envelope. Additionally, the key and the produce timestamp are set as properties in the EventData before sending it to Event Hubs. -Similarly, you can also configure your Samza job to write to Event Hubs. Follow the same descriptors defined in the Consuming from Event Hubs section to write to Event Hubs: +The EventHubsInputDescriptor allows you to specify the properties of each EventHubs stream your application should read from. For each of your input streams, you should create a corresponding instance of EventHubsInputDescriptor by providing a topic-name and a serializer. {% highlight java %} - 1 public void describe(StreamApplicationDescriptor appDescriptor) { - 2 // Define your system here - 3 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs"); - 4 - 5 // Choose your serializer/deserializer for the EventData payload - 6 StringSerde serde = new StringSerde(); - 7 - 8 // Define the input and output descriptors with respective descriptors - 9 EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = -10 systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde) -11 .withSasKeyName(EVENTHUBS_SAS_KEY_NAME) -12 .withSasKey(EVENTHUBS_SAS_KEY_TOKEN); -13 -14 // Define the output streams with descriptors -15 OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); -16 -17 //... -18 } + EventHubsInputDescriptor<KV<String, String>> inputDescriptor = + systemDescriptor.getInputDescriptor(streamId, "eventhubs-namespace", "eventhubs-name", new StringSerde()) + .withSasKeyName("secretkey") + .withSasKey("sasToken-123") + .withConsumerGroup("$notdefault"); {% endhighlight %} -In the code snippet above, we create the input and output streams that can consume and produce from the configured Event Hubs entities. +By default, messages are sent and received as byte arrays. Samza then de-serializes them to typed objects using your provided Serde. For example, the above uses a `StringSerde` to de-serialize messages. -1. Line 3: A `EventHubsSystemDescriptor` is created with the name "eventhubs". You may set different system descriptors here. -2. Line 6: Event Hubs messages are produced as key value pairs. The [serde](../../documentation/versioned/container/serialization.html) is defined for the value of the payload of the outgoing Event Hubs' EventData. You may use any of the serdes that samza ships with out of the box or define your own. -The serde for the key is not set since it will always the String from the EventData [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__). -3. Line 9-12: An `EventHubsOutputDescriptor` is created with the required descriptors to gain access of the Event Hubs entity (`STREAM_ID`, `EVENTHUBS_NAMESPACE`, `EVENTHUBS_ENTITY`, `EVENTHUBS_SAS_KEY_NAME`, `EVENTHUBS_SAS_KEY_TOKEN`). -These must be set to the credentials of the entities you wish to connect to. -4. Line 15: creates an `OutputStream` with the previously defined `EventHubsOutputDescriptor`. -Alternatively, you can set these properties in the `.properties` file of the application. -Note: the keys set in the `.properties` file will override the ones set in code with descriptors. -Refer to the [Event Hubs configuration reference](../../documentation/versioned/jobs/samza-configurations.html#eventhubs) for the complete list of configurations. +####EventHubsOutputDescriptor -{% highlight jproperties %} -# define an event hub system factory with your identifier. eg: eh-system -systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory +Similarly, the `EventHubsOutputDescriptor` allows you to specify the output streams for your application. For each output stream you write to in EventHubs, you should create an instance of `EventHubsOutputDescriptor`. -# define your streams -systems.eh-system.stream.list=eh-output-stream -streams.eh-stream.samza.system=eh-system - -streams.eh-output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE -streams.eh-output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME -streams.eh-output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME -streams.eh-output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN +{% highlight java %} + EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = + systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, new StringSerde();) + .withSasKeyName(..) + .withSasKey(..); {% endhighlight %} -Then you can consume and produce a message to Event Hubs in your code as below: +####Security Model +Each EventHubs stream is scoped to a container called a _namespace_, which uniquely identifies an EventHubs in a region. EventHubs's [security model](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview) is based on Shared Access Signatures(SAS). +Hence, you should also provide your SAS keys and tokens to access the stream. You can generate your SAS tokens using the -{% highlight java %} -MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); -OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); -eventhubInput.sendTo(eventhubOutput) -{% endhighlight %} +####Data Model +Each event produced and consumed from an EventHubs stream is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data), which wraps a byte-array payload. When producing to EventHubs, Samza serializes your object into an `EventData` payload before sending it over the wire. Likewise, when consuming messages from EventHubs, messages are de-serialized into typed objects using the provided Serde. -## Advanced configuration +## Configuration ###Producer partitioning -The partition.method property determines how outgoing messages are partitioned. Valid values for this config are EVENT\_HUB\_HASHING, PARTITION\_KEY\_AS_PARTITION or ROUND\_ROBIN. +You can use `#withPartitioningMethod` to control how outgoing messages are partitioned. The following partitioning schemes are supported: 1. EVENT\_HUB\_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning. @@ -155,22 +79,16 @@ The partition.method property determines how outgoing messages are partitioned. 3. ROUND\_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored. -##### Using descriptors {% highlight java %} EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs") .withPartitioningMethod(PartitioningMethod.EVENT_HUB_HASHING); {% endhighlight %} -##### Using config properties -{% highlight jproperties %} -systems.eh-system.partition.method = EVENT_HUB_HASHING -{% endhighlight %} ### Consumer groups -Event Hubs supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the consumer group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job by configuring an Event Hubs.consumer.group +Event Hubs supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job using `withConsumerGroup`. -##### Using descriptors {% highlight java %} EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs"); EventHubsInputDescriptor<KV<String, String>> inputDescriptor = @@ -178,41 +96,54 @@ EventHubsInputDescriptor<KV<String, String>> inputDescriptor = .withConsumerGroup("my-group"); {% endhighlight %} -##### Using config properties -{% highlight jproperties %} -streams.eh-input-stream.eventhubs.consumer.group = my-group -{% endhighlight %} -### Serde +### Consumer buffer size -By default, the messages from Event Hubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde for your stream. +When the consumer reads a message from EventHubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory. -##### Using descriptors {% highlight java %} -JsonSerde inputSerde = new JsonSerde(); -EventHubsInputDescriptor<KV<String, String>> inputDescriptor = - systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, inputSerde); -JsonSerde outputSerde = new JsonSerde(); -EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = - systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, outputSerde); + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs") + .withReceiveQueueSize(10); {% endhighlight %} -##### Using config properties -{% highlight jproperties %} -streams.input0.samza.msg.serde = json -streams.output0.samza.msg.serde = json -{% endhighlight %} +### Code walkthrough -### Consumer buffer size +In this section, we will walk through a simple pipeline that reads from one EventHubs stream and copies each message to another output stream. -When the consumer reads a message from event hubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory. -##### Using descriptors {% highlight java %} - EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs") - .withReceiveQueueSize(10); +1 EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs").withNumClientThreads(5); + +2 EventHubsInputDescriptor<KV<String, String>> inputDescriptor = + systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, new StringSerde()) + .withSasKeyName(..) + .withSasKey(..)); + +3 EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = + systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde) + .withSasKeyName(..)) + .withSasKey(..)); + +4 MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); +5 OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); + + // Define the execution flow with the high-level API +6 eventhubInput +7 .map((message) -> { +8 System.out.println("Received Key: " + message.getKey()); +9 System.out.println("Received Message: " + message.getValue()); +10 return message; +11 }) +12 .sendTo(eventhubOutput); {% endhighlight %} -##### Using config properties -{% highlight jproperties %} -systems.eh-system.eventhubs.receive.queue.size = 10 -{% endhighlight %} \ No newline at end of file +-Line 1 instantiates an `EventHubsSystemDescriptor` configuring an EventHubsClient with 5 threads. To consume from other input sources like Kafka, you can define their corresponding descriptors. + +-Line 2 creates an `EventHubsInputDescriptor` with a String serde for its values. Recall that Samza follows a KV data-model for input messages. In the case of EventHubs, the key is a string which is set to the [partitionKey](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__) in the message. Hence, no separate key serde is required. + +-Line 3 creates an `EventHubsOutputDescriptor` to write to an EventHubs stream with the given credentials. + +-Line 4 obtains a `MessageStream` from the input descriptor that you can later chain operations on. + +-Line 5 creates an `OutputStream` with the previously defined `EventHubsOutputDescriptor` that you can send messages to. + +-Line 7-12 define a simple pipeline that copies message from one EventHubs stream to another \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md index 447bfdc..b71c736 100644 --- a/docs/learn/documentation/versioned/connectors/kafka.md +++ b/docs/learn/documentation/versioned/connectors/kafka.md @@ -105,11 +105,10 @@ The above example configures Samza to ignore checkpointed offsets for `page-view -### Code walkthrough +### Code walkthrough: High-level API -In this section, we walk through a complete example. +In this section, we walk through a complete example that reads from a Kafka topic, filters a few messages and writes them to another topic. -#### High-level API {% highlight java %} // Define coordinates of the Kafka cluster using the KafkaSystemDescriptor 1 KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") @@ -147,4 +146,4 @@ In this section, we walk through a complete example. - Line 10 creates an OuputStream for the output topic -- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream \ No newline at end of file +- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream http://git-wip-us.apache.org/repos/asf/samza/blob/299c0317/docs/learn/documentation/versioned/jobs/logging.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md index 8717c28..9dde172 100644 --- a/docs/learn/documentation/versioned/jobs/logging.md +++ b/docs/learn/documentation/versioned/jobs/logging.md @@ -163,7 +163,9 @@ Rest all of the system properties will be set exactly like in the case of log4j, If you are already using log4j and want to upgrade to using log4j2, following are the changes you will need to make in your job: - Clean your lib directory. This will be rebuilt with new dependency JARs and xml files. + - Replace log4jâs dependencies with log4j2âs in your pom.xml/build.gradle as mentioned above. Please ensure that none of log4jâs dependencies remain in pom.xml/build.gradle + - Create a log4j2.xml to match your existing log4j.xml file. - Rebuild your application