Repository: samza
Updated Branches:
  refs/heads/master 80f5f388f -> ad578e241


samza documentation: hdfs and eventhubs connector

Author: Hai Lu <h...@linkedin.com>

Reviewers: Jagadish<jagad...@apache.org>

Closes #685 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad578e24
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad578e24
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad578e24

Branch: refs/heads/master
Commit: ad578e24149f7d94465557dec852d9c36d9ee3b8
Parents: 80f5f38
Author: Hai Lu <h...@linkedin.com>
Authored: Tue Oct 2 22:17:15 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Oct 2 22:17:15 2018 -0700

----------------------------------------------------------------------
 .../versioned/connectors/eventhubs.md           |  95 ++++++++++++-
 .../documentation/versioned/connectors/hdfs.md  | 133 ++++++++++++++++++-
 docs/learn/documentation/versioned/index.html   |   2 +-
 3 files changed, 225 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/connectors/eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/eventhubs.md 
b/docs/learn/documentation/versioned/connectors/eventhubs.md
index b99b46d..0f8766b 100644
--- a/docs/learn/documentation/versioned/connectors/eventhubs.md
+++ b/docs/learn/documentation/versioned/connectors/eventhubs.md
@@ -19,6 +19,97 @@ title: Eventhubs Connector
    limitations under the License.
 -->
 
-# Section 1
-# Section 2
+## Overview
 
+The Samza EventHubs connector provides access to [Azure 
Eventhubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features),
 Microsoft’s data streaming service on Azure. An event hub is similar to a 
Kafka topic and can have multiple partitions with producers and consumers. Each 
message produced or consumed from an event hub is an instance of 
[EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data).
+
+## Consuming from EventHubs
+
+Samza’s 
[EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java)
 wraps the EventData into an 
[EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java).
 Samza's eventhubs consumer wraps each message from Eventhubs into an 
EventHubMessageEnvelope. The envelope has two fields of interest - the key, 
which is set to the event's partition key and the message, which is set to the 
actual data in the event.
+
+You can configure your Samza jobs to process data from Azure Eventhubs. To 
configure Samza to consume from EventHub streams:
+
+{% highlight jproperties %}
+# define an event hub system factory with your identifier. eg: eh-system
+systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+
+# define your streams
+systems.eh-system.stream.list=eh-input-stream
+streams.eh-stream.samza.system=eh-system
+
+# define required properties for your streams
+streams.eh-input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+streams.eh-input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+streams.eh-input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+streams.eh-input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+{% endhighlight %}
+
+It is required to provide values for YOUR-STREAM-NAMESPACE, YOUR-ENTITY-NAME, 
YOUR-SAS-KEY-NAME, YOUR-SAS-KEY-TOKEN to read or write to the stream.
+
+## Producing to EventHubs
+
+Similarly, you can also configure your Samza job to write to EventHubs. Follow 
the same configs defined in the Consuming from EventHubs section to write to 
EventHubs:
+
+{% highlight jproperties %}
+# define an event hub system factory with your identifier. eg: eh-system
+systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+
+# define your streams
+systems.eh-system.stream.list=eh-output-stream
+streams.eh-stream.samza.system=eh-system
+
+streams.eh-output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+streams.eh-output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+streams.eh-output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+streams.eh-output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+{% endhighlight %}
+
+Then you can create and produce a message to eventhubs in your code as below:
+
+{% highlight java %}
+OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new 
SystemStream("eh-system", "output0"), key, message); 
+collector.send(envelope);
+{% endhighlight %}
+
+Each 
[OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 is converted into an 
[EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data)
 instance whose body is set to the message in the envelope. Additionally, the 
key and the produce timestamp are set as properties in the EventData before 
sending it to EventHubs.
+
+## Advanced configuration
+
+###Producer partitioning
+
+The partition.method property determines how outgoing messages are 
partitioned. Valid values for this config are EVENT\_HUB\_HASHING, 
PARTITION\_KEY\_AS_PARTITION or ROUND\_ROBIN.
+
+1. EVENT\_HUB\_HASHING: By default, Samza computes the partition for an 
outgoing message based on the hash of its partition-key. This ensures that 
events with the same key are sent to the same partition. If this option is 
chosen, the partition key should be a string. If the partition key is not set, 
the key in the message is used for partitioning.
+
+2. PARTITION\_KEY\_AS\_PARTITION: In this method, each message is sent to the 
partition specified by its partition key. This requires the partition key to be 
an integer. If the key is greater than the number of partitions, a modulo 
operation will be performed on the key. Similar to EVENT\_HUB\_HASHING, the key 
in the message is used if the partition key is not specified.
+
+3. ROUND\_ROBIN: In this method, outgoing messages are distributed in a 
round-robin across all partitions. The key and the partition key in the message 
are ignored.
+
+{% highlight jproperties %}
+systems.eh-system.partition.method = EVENT_HUB_HASHING
+{% endhighlight %}
+
+### Consumer groups
+
+Eventhub supports the notion of [consumer 
groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups)
 which enable multiple applications to have their own view of the event stream. 
Each partition is exclusively consumed by one consumer in the consumer group. 
Each event hub stream has a pre-defined consumer group named $Default. You can 
define your own consumer group for your job by configuring a 
eventhubs.consumer.group
+
+{% highlight jproperties %}
+streams.eh-input-stream.eventhubs.consumer.group = my-group
+{% endhighlight %}
+
+### Serde
+
+By default, the messages from EventHubs are sent and received as byte arrays. 
You can configure a serializer and deserializer for your message by setting a 
value for msg.serde for your stream.
+
+{% highlight jproperties %}
+streams.input0.samza.msg.serde = json
+streams.output0.samza.msg.serde = json
+{% endhighlight %}
+
+### Consumer buffer size
+
+When the consumer reads a message from event hubs, it appends them to a shared 
producer-consumer queue corresponding to its partition. This config determines 
the per-partition queue size. Setting a higher value for this config typically 
achieves a higher throughput at the expense of increased on-heap memory.
+
+{% highlight jproperties %}
+systems.eh-system.eventhubs.receive.queue.size = 10
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/connectors/hdfs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md 
b/docs/learn/documentation/versioned/connectors/hdfs.md
index a78c4aa..9692d18 100644
--- a/docs/learn/documentation/versioned/connectors/hdfs.md
+++ b/docs/learn/documentation/versioned/connectors/hdfs.md
@@ -19,6 +19,135 @@ title: HDFS Connector
    limitations under the License.
 -->
 
-# Section 1
-# Section 2
+## Overview
 
+Samza applications can read and process data stored in HDFS. Likewise, you can 
also write processed results to HDFS.
+
+### Environment Requirement
+
+Your job needs to run on the same YARN cluster which hosts the HDFS you want 
to consume from (or write into).
+
+## Consuming from HDFS
+
+You can configure your Samza job to read from HDFS files with the 
[HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java).
 Avro encoded records are supported out of the box and it is easy to extend to 
support other formats (plain text, csv, json etc). See Event Format section 
below.
+
+### Partitioning
+
+Partitioning works at the level of individual directories and files. Each 
directory is treated as its own stream, while each of its files is treated as a 
partition. For example, when reading from a directory on HDFS with 10 files, 
there will be 10 partitions created. This means that you can have up-to 10 
containers to process them. If you want to read from a single HDFS file, there 
is currently no way to break down the consumption - you can only have one 
container to process the file.
+
+### Event format
+
+Samza's HDFS consumer wraps each avro record read from HDFS into a 
message-envelope. The 
[Envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html)
 contains three fields of interest:
+
+1. The key, which is empty
+2. The message, which is set to the avro 
[GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
+3. The stream partition, which is set to the name of the HDFS file
+
+To support input formats which are not avro, you can implement the 
[SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java)
 interface (example: 
[AvroFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java))
+
+### End of stream support
+
+While streaming sources like Kafka are unbounded, files on HDFS have finite 
data and have a notion of end-of-file.
+
+When reading from HDFS, your Samza job automatically exits after consuming all 
the data. You can choose to implement 
[EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html)
 to receive a callback when reaching end of stream. 
+
+### Basic Configuration
+
+Here is a few of the basic configs which are required to set up 
HdfsSystemConsumer:
+
+{% highlight jproperties %}
+# The HDFS system consumer is implemented under the 
org.apache.samza.system.hdfs package,
+# so use HdfsSystemFactory as the system factory for your system
+systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# Define the hdfs stream
+streams.hdfs-clickstream.samza.system=hdfs
+
+# You need to specify the path of files you want to consume
+streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
+
+# You can specify a white list of files you want your job to process (in Java 
Pattern style)
+systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
+
+# You can specify a black list of files you don't want your job to process (in 
Java Pattern style),
+# by default it's empty.
+# Note that you can have both white list and black list, in which case both 
will be applied.
+systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
+{% endhighlight %}
+
+### Security Configuration
+
+The following additional configs are required when accessing HDFS clusters 
that have kerberos enabled:
+
+{% highlight jproperties %}
+# When the job is running in a secure environment, use the 
SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos 
delegation tokens
+job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
+
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
+
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab
+{% endhighlight %}
+
+### Advanced Configuration
+
+Some of the advanced configuration you might need to set up:
+
+{% highlight jproperties %}
+# Specify the group pattern for advanced partitioning.
+systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.*
+
+# Specify the type of files your job want to process (support avro only for 
now)
+systems.hdfs-clickstream.consumer.reader=avro
+
+# Max number of retries (per-partition) before the container fails.
+system.hdfs-clickstream.consumer.numMaxRetries=10
+{% endhighlight %}
+
+The advanced partitioning goes beyond the basic assumption that each file is a 
partition. With advanced partitioning you can group files into partitions 
arbitrarily. For example, if you have a set of files as [part-01-a.avro, 
part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro] that you want 
to organize into three partitions as (part-01-a.avro, part-01-b.avro), 
(part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the 
middle act as a “group identifier”, you can then set this property to be 
“part-[id]-.” (note that "[id]" is a reserved term here, i.e. you have to 
literally put it as [id]). The partitioner will apply this pattern to all file 
names and extract the “group identifier” (“[id]” in the pattern), then 
use the “group identifier” to group files into partitions.
+
+## Producing to HDFS
+
+The samza-hdfs module implements a Samza Producer to write to HDFS. The 
current implementation includes a ready-to-use HdfsSystemProducer, and two 
HdfsWriters: One that writes messages of raw bytes to a SequenceFile of 
BytesWritable keys and values. Another writes out Avro data files including the 
schema automatically reflected from the POJO objects fed to it.
+
+### Configuring an HdfsSystemProducer
+
+You can configure an HdfsSystemProducer like any other Samza system: using 
configuration keys and values set in a job.properties file. You might configure 
the system producer for use by your StreamTasks like this:
+
+{% highlight jproperties %}
+# set the SystemFactory implementation to instantiate HdfsSystemProducer 
aliased to 'hdfs'
+systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# Assign the implementation class for this system's HdfsWriter
+systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+#systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+# define a serializer/deserializer for the hdfs system
+# DO NOT define (i.e. comment out) a SerDe when using the 
AvroDataFileHdfsWriter so it can reflect the schema
+systems.hdfs.samza.msg.serde=some-serde-impl
+
+# Assign a serde implementation to be used for the stream called "metrics"
+systems.hdfs.streams.metrics.samza.msg.serde=some-metrics-impl
+
+# Set compression type supported by chosen Writer.
+# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none
+systems.hdfs.producer.hdfs.compression.type=snappy
+
+# The base dir for HDFS output. Output is structured into buckets. The default 
Bucketer for SequenceFile HdfsWriters
+# is currently /BASE/JOB_NAME/DATE_PATH/FILES, where BASE is set below
+systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
+
+# Assign the implementation class for the HdfsWriter's Bucketer
+systems.hdfs.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
+
+# Configure the DATE_PATH the Bucketer will set to bucket output files by day 
for this job run.
+systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+
+# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per 
file.
+# A new file will be cut and output continued on the next write call each time 
this many bytes
+# (records for AvroDataFileHdfsWriter) have been written.
+systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
+#systems.hdfs.producer.hdfs.write.batch.size.records=10000
+{% endhighlight %}
+
+The above configuration assumes a Metrics and Serde implementation has been 
properly configured against the some-metrics-impl and some-serde-impl and 
labels somewhere else in the same job.properties file. Each of these properties 
has a reasonable default, so you can leave out the ones you don’t need to 
customize for your job run.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html 
b/docs/learn/documentation/versioned/index.html
index 80035bb..94f7e18 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -51,7 +51,7 @@ title: Documentation
 <ul class="documentation-list">
   <li><a href="connectors/overview.html">Connectors overview</a></li>
   <li><a href="connectors/kafka.html">Apache Kafka</a></li>
-  <li><a href="connectors/hdfs.html">Apache Hadoop</a></li>
+  <li><a href="connectors/hdfs.html">HDFS</a></li>
   <li><a href="connectors/eventhubs.html">Azure EventHubs</a></li>
   <li><a href="connectors/kinesis.html">AWS Kinesis</a></li>
 </ul>

Reply via email to