Cleanup docs for HDFS connector Author: Jagadish <jvenkatra...@linkedin.com>
Reviewers: Jagadish<jagad...@apache.org> Closes #793 from vjagadish1989/website-reorg30 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3e397022 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3e397022 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3e397022 Branch: refs/heads/1.0.0 Commit: 3e397022a5a54630d21a1cbbc0c273016592a0c2 Parents: ac5f948 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Fri Nov 2 17:35:20 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Nov 13 19:33:26 2018 -0800 ---------------------------------------------------------------------- .../documentation/versioned/connectors/hdfs.md | 134 +++++++------------ 1 file changed, 50 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3e397022/docs/learn/documentation/versioned/connectors/hdfs.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md b/docs/learn/documentation/versioned/connectors/hdfs.md index 9692d18..9b79f24 100644 --- a/docs/learn/documentation/versioned/connectors/hdfs.md +++ b/docs/learn/documentation/versioned/connectors/hdfs.md @@ -21,133 +21,99 @@ title: HDFS Connector ## Overview -Samza applications can read and process data stored in HDFS. Likewise, you can also write processed results to HDFS. - -### Environment Requirement - -Your job needs to run on the same YARN cluster which hosts the HDFS you want to consume from (or write into). +The HDFS connector allows your Samza jobs to read data stored in HDFS files. Likewise, you can write processed results to HDFS. +To interact with HDFS, Samza requires your job to run on the same YARN cluster. ## Consuming from HDFS +### Input Partitioning -You can configure your Samza job to read from HDFS files with the [HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java). Avro encoded records are supported out of the box and it is easy to extend to support other formats (plain text, csv, json etc). See Event Format section below. - -### Partitioning +Partitioning works at the level of individual directories and files. Each directory is treated as its own stream and each of its files is treated as a _partition_. For example, Samza creates 5 partitions when it's reading from a directory containing 5 files. There is no way to parallelize the consumption when reading from a single file - you can only have one container to process the file. -Partitioning works at the level of individual directories and files. Each directory is treated as its own stream, while each of its files is treated as a partition. For example, when reading from a directory on HDFS with 10 files, there will be 10 partitions created. This means that you can have up-to 10 containers to process them. If you want to read from a single HDFS file, there is currently no way to break down the consumption - you can only have one container to process the file. +### Input Event format +Samza supports avro natively, and it's easy to extend to other serialization formats. Each avro record read from HDFS is wrapped into a message-envelope. The [envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains these 3 fields: -### Event format +- The key, which is empty -Samza's HDFS consumer wraps each avro record read from HDFS into a message-envelope. The [Envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three fields of interest: +- The value, which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html) -1. The key, which is empty -2. The message, which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html) -3. The stream partition, which is set to the name of the HDFS file +- The partition, which is set to the name of the HDFS file -To support input formats which are not avro, you can implement the [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) interface (example: [AvroFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java)) +To support non-avro input formats, you can implement the [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) interface. -### End of stream support +### EndOfStream -While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of end-of-file. +While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of EOF. When reading from HDFS, your Samza job automatically exits after consuming all the data. You can implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to get a callback once EOF has been reached. -When reading from HDFS, your Samza job automatically exits after consuming all the data. You can choose to implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to receive a callback when reaching end of stream. -### Basic Configuration +### Defining streams -Here is a few of the basic configs which are required to set up HdfsSystemConsumer: +Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS. {% highlight jproperties %} -# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package, -# so use HdfsSystemFactory as the system factory for your system systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory -# Define the hdfs stream streams.hdfs-clickstream.samza.system=hdfs - -# You need to specify the path of files you want to consume streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11 -# You can specify a white list of files you want your job to process (in Java Pattern style) -systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro - -# You can specify a black list of files you don't want your job to process (in Java Pattern style), -# by default it's empty. -# Note that you can have both white list and black list, in which case both will be applied. -systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro {% endhighlight %} -### Security Configuration +The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory. -The following additional configs are required when accessing HDFS clusters that have kerberos enabled: +#### Whitelists & Blacklists +If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results. {% highlight jproperties %} -# When the job is running in a secure environment, use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens -job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory - -# Kerberos principal -yarn.kerberos.principal=your-principal-name - -# Path of the keytab file (local path) -yarn.kerberos.keytab=/tmp/keytab -{% endhighlight %} - -### Advanced Configuration - -Some of the advanced configuration you might need to set up: - -{% highlight jproperties %} -# Specify the group pattern for advanced partitioning. -systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.* - -# Specify the type of files your job want to process (support avro only for now) -systems.hdfs-clickstream.consumer.reader=avro - -# Max number of retries (per-partition) before the container fails. -system.hdfs-clickstream.consumer.numMaxRetries=10 +systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro +systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro {% endhighlight %} -The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro] that you want to organize into three partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a âgroup identifierâ, you can then set this property to be âpart-[id]-.â (note that "[id]" is a reserved term here, i.e. you have to literally put it as [id]). The partitioner will apply this pattern to all file names and extract the âgroup identifierâ (â[id]â in the pattern), then use the âgroup identifierâ to group files into partitions. ## Producing to HDFS -The samza-hdfs module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use HdfsSystemProducer, and two HdfsWriters: One that writes messages of raw bytes to a SequenceFile of BytesWritable keys and values. Another writes out Avro data files including the schema automatically reflected from the POJO objects fed to it. +#### Output format -### Configuring an HdfsSystemProducer - -You can configure an HdfsSystemProducer like any other Samza system: using configuration keys and values set in a job.properties file. You might configure the system producer for use by your StreamTasks like this: +Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection. {% highlight jproperties %} # set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs' systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory +systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter +{% endhighlight %} + -# Assign the implementation class for this system's HdfsWriter +If your output is non-avro, you can describe its format by implementing your own serializer. +{% highlight jproperties %} systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter -#systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter -# define a serializer/deserializer for the hdfs system -# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema -systems.hdfs.samza.msg.serde=some-serde-impl +serializers.registry.my-serde-name.class=MySerdeFactory +systems.hdfs.samza.msg.serde=my-serde-name +{% endhighlight %} -# Assign a serde implementation to be used for the stream called "metrics" -systems.hdfs.streams.metrics.samza.msg.serde=some-metrics-impl -# Set compression type supported by chosen Writer. -# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none -systems.hdfs.producer.hdfs.compression.type=snappy +#### Output directory structure -# The base dir for HDFS output. Output is structured into buckets. The default Bucketer for SequenceFile HdfsWriters -# is currently /BASE/JOB_NAME/DATE_PATH/FILES, where BASE is set below +Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. +{% highlight jproperties %} systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data - -# Assign the implementation class for the HdfsWriter's Bucketer -systems.hdfs.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer - -# Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run. systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd +{% endhighlight %} -# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file. -# A new file will be cut and output continued on the next write call each time this many bytes -# (records for AvroDataFileHdfsWriter) have been written. +You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file. + +{% highlight jproperties %} systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728 -#systems.hdfs.producer.hdfs.write.batch.size.records=10000 +systems.hdfs.producer.hdfs.write.batch.size.records=10000 {% endhighlight %} -The above configuration assumes a Metrics and Serde implementation has been properly configured against the some-metrics-impl and some-serde-impl and labels somewhere else in the same job.properties file. Each of these properties has a reasonable default, so you can leave out the ones you donât need to customize for your job run. \ No newline at end of file +## Security + +You can access Kerberos-enabled HDFS clusters by providing your principal and the path to your key-tab file. Samza takes care of automatically creating and renewing your Kerberos tokens periodically. + +{% highlight jproperties %} +job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory + +# Kerberos principal +yarn.kerberos.principal=your-principal-name + +# Path of the keytab file (local path) +yarn.kerberos.keytab=/tmp/keytab +{% endhighlight %}