[ 
https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089490#comment-15089490
 ] 

ASF GitHub Bot commented on STORM-1199:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/936#discussion_r49209735
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -405,7 +410,123 @@ On worker hosts the bolt/trident-state code will use 
the keytab file with princi
     Namenode. This method is little dangerous as you need to ensure all 
workers have the keytab file at the same location and you need
     to remember this as you bring up new hosts in the cluster.
     
    -## License
    +---
    +
    +# HDFS Spout
    +
    +Hdfs spout is intended to allow feeding data into Storm from a HDFS 
directory. 
    +It will actively monitor the directory to consume any new files that 
appear in the directory.
    +HDFS spout does not support Trident currently.
    +
    +**Impt**: Hdfs spout assumes that the files being made visible to it in 
the monitored directory 
    +are NOT actively being written to. Only after a file is completely written 
should it be made
    +visible to the spout. This can be achieved by either writing the files out 
to another directory 
    +and once completely written, move it to the monitored directory. 
Alternatively the file
    +can be created with a '.ignore' suffix in the monitored directory and 
after data is completely 
    +written, rename it without the suffix. File names with a '.ignore' suffix 
are ignored
    +by the spout.
    +
    +When the spout is actively consuming a file, it renames the file with a 
'.inprogress' suffix.
    +After consuming all the contents in the file, the file will be moved to a 
configurable *done* 
    +directory and the '.inprogress' suffix will be dropped.
    +
    +**Concurrency** If multiple spout instances are used in the topology, each 
instance will consume
    +a different file. Synchronization among spout instances is done using lock 
files created in a 
    +(by default) '.lock' subdirectory under the monitored directory. A file 
with the same name
    +as the file being consumed (without the in progress suffix) is created in 
the lock directory.
    +Once the file is completely consumed, the corresponding lock file is 
deleted.
    +
    +**Recovery from failure**
    +Periodically, the spout also records progress information wrt to how much 
of the file has been
    +consumed in the lock file. In case of an crash of the spout instance (or 
force kill of topology) 
    +another spout can take over the file and resume from the location recorded 
in the lock file.
    +
    +Certain error conditions (such spout crashing) can leave behind lock files 
without deleting them. 
    +Such a stale lock file also indicates that the corresponding input file 
has also not been completely 
    +processed. When detected, ownership of such stale lock files will be 
transferred to another spout.   
    +The configuration 'hdfsspout.lock.timeout.sec' is used to specify the 
duration of inactivity after 
    +which lock files should be considered stale. For lock file ownership 
transfer to succeed, the HDFS
    +lease on the file (from prev lock owner) should have expired. Spouts scan 
for stale lock files
    +before selecting the next file for consumption.
    +
    +**Lock on *.lock* Directory**
    +Hdfs spout instances create a *DIRLOCK* file in the .lock directory to 
co-ordinate certain accesses to 
    +the .lock dir itself. A spout will try to create it when it needs access 
to the .lock directory and
    +then delete it when done.  In error conditions such as a topology crash, 
force kill or untimely death 
    +of a spout, this file may not get deleted. Future running instances of the 
spout will eventually recover
    +this once the DIRLOCK file becomes stale due to inactivity for 
hdfsspout.lock.timeout.sec seconds.
    +
    +## Usage
    +
    +The following example creates an HDFS spout that reads text files from 
HDFS path hdfs://localhost:54310/source.
    +
    +```java
    +// Instantiate spout
    +HdfsSpout textReaderSpout = new 
HdfsSpout().withOutputFields(TextFileReader.defaultFields);
    +// HdfsSpout seqFileReaderSpout = new 
HdfsSpout().withOutputFields(SequenceFileReader.defaultFields);
    +
    +// textReaderSpout.withConfigKey("custom.keyname"); // Optional. Not 
required normally unless you need to change the keyname use to provide hds 
settings. This keyname defaults to 'hdfs.config' 
    +
    +// Configure it
    +Config conf = new Config();
    +conf.put(Configs.SOURCE_DIR, "hdfs://localhost:54310/source");
    +conf.put(Configs.ARCHIVE_DIR, "hdfs://localhost:54310/done");
    +conf.put(Configs.BAD_DIR, "hdfs://localhost:54310/badfiles");
    +conf.put(Configs.READER_TYPE, "text"); // or 'seq' for sequence files
    +
    +// Create & configure topology
    +TopologyBuilder builder = new TopologyBuilder();
    +builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM);
    +
    +// Setup bolts and other topology configuration
    +     ..snip..
    +
    +// Submit topology with config
    +StormSubmitter.submitTopologyWithProgressBar("topologyName", conf, 
builder.createTopology());
    +```
    +
    +See sample HdfsSpoutTopolgy in storm-starter.
    +
    +## Configuration Settings
    +Class HdfsSpout provided following methods for configuration:
    +
    +`HdfsSpout withOutputFields(String... fields)` : This sets the names for 
the output fields. 
    +The number of fields depends upon the reader being used. For convenience, 
built-in reader types 
    +expose a static member called `defaultFields` that can be used for this. 
    + 
    + `HdfsSpout withConfigKey(String configKey)`
    +Optional setting. It allows overriding the default key name 
('hdfs.config') with new name for 
    +specifying HDFS configs. Typically used to specify kerberos keytab and 
principal.
    +
    +**E.g:**
    +```java
    +    HashMap map = new HashMap();
    +    map.put("hdfs.keytab.file", "/path/to/keytab");
    +    map.put("hdfs.kerberos.principal","u...@example.com");
    +    conf.set("hdfs.config", map)
    +```
    +
    +Only settings mentioned in **bold** are required.
    +
    +| Setting                      | Default     | Description |
    +|------------------------------|-------------|-------------|
    +|**hdfsspout.reader.type**     |             | Indicates the reader for 
the file format. Set to 'seq' for reading sequence files or 'text' for text 
files. Set to a fully qualified class name if using a custom type (that 
implements interface org.apache.storm.hdfs.spout.FileReader)|
    +|**hdfsspout.hdfs**            |             | HDFS URI. Example:  
hdfs://namenodehost:8020
    +|**hdfsspout.source.dir**      |             | HDFS location from where to 
read.  E.g. /data/inputfiles  |
    +|**hdfsspout.archive.dir**     |             | After a file is processed 
completely it will be moved to this directory. E.g. /data/done|
    +|**hdfsspout.badfiles.dir**    |             | if there is an error 
parsing a file's contents, the file is moved to this location.  E.g. 
/data/badfiles  |
    +|hdfsspout.lock.dir            | '.lock' subdirectory under 
hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS 
spout instances synchronize using *lock* files. Before processing a file the 
spout instance creates a lock file in this directory with same name as input 
file and deletes this lock file after processing the file. Spouts also 
periodically makes a note of their progress (wrt reading the input file) in the 
lock file so that another spout instance can resume progress on the same file 
if the spout dies for any reason.|
    +|hdfsspout.ignore.suffix       |   .ignore   | File names with this suffix 
in the in the hdfsspout.source.dir location will not be processed|
    +|hdfsspout.commit.count        |    20000    | Record progress in the lock 
file after these many records are processed. If set to 0, this criterion will 
not be used. |
    +|hdfsspout.commit.sec          |    10       | Record progress in the lock 
file after these many seconds have elapsed. Must be greater than 0 |
    +|hdfsspout.max.outstanding     |   10000     | Limits the number of 
unACKed tuples by pausing tuple generation (if ACKers are used in the topology) 
|
    --- End diff --
    
    Do we want to explain how this works with max.spout.pending, and why we 
need both?


> Create HDFS Spout
> -----------------
>
>                 Key: STORM-1199
>                 URL: https://issues.apache.org/jira/browse/STORM-1199
>             Project: Apache Storm
>          Issue Type: New Feature
>            Reporter: Roshan Naik
>            Assignee: Roshan Naik
>         Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, 
> hdfs-spout.1.patch
>
>
> Create an HDFS spout so that Storm can suck in data from files in a HDFS 
> directory



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to