Added config hdfsspout.hdfs. Improved logging, bug fix in how hdfs specific 
settings are used, added storm-starter topology HdfsSpoutToplogy, outputFields 
now need to be specified on spout. Improved docs, updated UTs.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac1322fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac1322fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac1322fb

Branch: refs/heads/1.x-branch
Commit: ac1322fbe7bf8bc3dfb614118312cee37c75b44e
Parents: a6fed4c
Author: Roshan Naik <[email protected]>
Authored: Wed Dec 30 18:37:40 2015 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../jvm/storm/starter/HdfsSpoutTopology.java    | 75 ++++++++++------
 external/storm-hdfs/README.md                   | 92 ++++++++++++++++----
 .../org/apache/storm/hdfs/spout/Configs.java    |  1 +
 .../org/apache/storm/hdfs/spout/DirLock.java    |  8 +-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 89 +++++++++++--------
 .../storm/hdfs/spout/SequenceFileReader.java    |  1 +
 .../apache/storm/hdfs/spout/TextFileReader.java |  1 +
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 32 +++----
 8 files changed, 198 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java 
b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
index 45a6aaf..3837943 100644
--- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
@@ -33,6 +33,12 @@ import 
org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.spout.Configs;
 import org.apache.storm.hdfs.spout.HdfsSpout;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.*;
+import backtype.storm.tuple.*;
+import backtype.storm.task.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -42,37 +48,47 @@ public class HdfsSpoutTopology {
   public static final String SPOUT_ID = "hdfsspout";
   public static final String BOLT_ID = "hdfsbolt";
 
-  public static final int SPOUT_NUM = 4;
-  public static final int BOLT_NUM = 4;
-  public static final int WORKER_NUM = 4;
+  public static final int SPOUT_NUM = 1;
+  public static final int BOLT_NUM = 1;
+  public static final int WORKER_NUM = 1;
 
+  public static class ConstBolt extends BaseRichBolt {
+    private static final long serialVersionUID = -5313598399155365865L;
+    public static final String FIELDS = "message";
+    private OutputCollector collector;
+    private static final Logger log = LoggerFactory.getLogger(ConstBolt.class);
 
-  private static HdfsBolt makeHdfsBolt(String arg, String destinationDir) {
-    DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat()
-            .withPath(destinationDir)
-            .withExtension(".txt");
-    RecordFormat format = new DelimitedRecordFormat();
-    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(5.0f, 
TimedRotationPolicy.TimeUnit.MINUTES);
+    public ConstBolt() {
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+      this.collector = collector;
+    }
 
-    return new HdfsBolt()
-            .withConfigKey("hdfs.config")
-            .withFsUrl(arg)
-            .withFileNameFormat(fileNameFormat)
-            .withRecordFormat(format)
-            .withRotationPolicy(rotationPolicy)
-            .withSyncPolicy(new CountSyncPolicy(1000));
-  }
+    @Override
+    public void execute(Tuple tuple) {
+      log.info("Received tuple : {}", tuple.getValue(0));
+      collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields(FIELDS));
+    }
+  } // class
 
   /** Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
    *    args: sourceDir sourceArchiveDir badDir destinationDir
    */
   public static void main(String[] args) throws Exception {
     // 0 - validate args
-    if (args.length < 6) {
+    if (args.length < 7) {
       System.err.println("Please check command line arguments.");
       System.err.println("Usage :");
-      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName 
fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
+      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName 
hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
       System.err.println(" topologyName - topology name.");
+      System.err.println(" hdfsUri - hdfs name node URI");
       System.err.println(" fileFormat -  Set to 'TEXT' for reading text files 
or 'SEQ' for sequence files.");
       System.err.println(" sourceDir  - read files from this HDFS dir using 
HdfsSpout.");
       System.err.println(" sourceArchiveDir - after a file in sourceDir is 
read completely, it is moved to this HDFS location.");
@@ -85,14 +101,15 @@ public class HdfsSpoutTopology {
 
     // 1 - parse cmd line args
     String topologyName = args[0];
-    String fileFormat = args[1];
-    String sourceDir = args[2];
-    String sourceArchiveDir = args[3];
-    String badDir = args[4];
-    String destinationDir = args[5];
+    String hdfsUri = args[1];
+    String fileFormat = args[2];
+    String sourceDir = args[3];
+    String sourceArchiveDir = args[4];
+    String badDir = args[5];
+    String destinationDir = args[6];
 
     // 2 - create and configure spout and bolt
-    HdfsBolt bolt = makeHdfsBolt(args[0], destinationDir);
+    ConstBolt bolt = new ConstBolt();
     HdfsSpout spout = new HdfsSpout().withOutputFields("line");
 
     Config conf = new Config();
@@ -100,6 +117,10 @@ public class HdfsSpoutTopology {
     conf.put(Configs.ARCHIVE_DIR, sourceArchiveDir);
     conf.put(Configs.BAD_DIR, badDir);
     conf.put(Configs.READER_TYPE, fileFormat);
+    conf.put(Configs.HDFS_URI, hdfsUri);
+    conf.setDebug(true);
+    conf.setNumWorkers(1);
+    conf.setMaxTaskParallelism(1);
 
     // 3 - Create and configure topology
     conf.setDebug(true);
@@ -115,8 +136,8 @@ public class HdfsSpoutTopology {
     StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
builder.createTopology());
     Nimbus.Client client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
 
-    // 5 - Print metrics every 30 sec, kill topology after 5 min
-    for (int i = 0; i < 10; i++) {
+    // 5 - Print metrics every 30 sec, kill topology after 20 min
+    for (int i = 0; i < 40; i++) {
       Thread.sleep(30 * 1000);
       FastWordCountTopology.printMetrics(client, topologyName);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 3a64ae6..237fc8c 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -1,20 +1,61 @@
 # Storm HDFS
 
 Storm components for interacting with HDFS file systems
- - HDFS Bolt
  - HDFS Spout
- 
+ - HDFS Bolt 
 
 # HDFS Spout
 
+Hdfs spout is intended to allow feeding data into Storm from a HDFS directory. 
+It will actively monitor the directory to consume any new files that appear in 
the directory.
+
+**Impt**: Hdfs spout assumes that the files being made visible to it in the 
monitored directory 
+are NOT actively being written to. Only after a files is completely written 
should it be made
+visible to the spout. This can be achieved by either writing the files out to 
another directory 
+and once completely written, move it to the monitored directory. Alternatively 
the file
+can be created with a '.ignore' suffix in the monitored directory and after 
data is completely 
+written, rename it without the suffix. File names with a '.ignore' suffix are 
ignored
+by the spout.
+
+When the spout is actively consuming a file, ite renames the file with a 
'.inprogress' suffix.
+After consuming all the contents in the file, the file will be moved to a 
configurable *done* 
+directory and the '.inprogress' suffix will be dropped.
+
+**Concurrency** If multiple spout instances are used in the topology, each 
instance will consume
+a different file. Synchronization among spout instances is done using a lock 
files created in 
+(by default) a '.lock' subdirectory under the monitored directory. A file with 
the same name
+as the file being consumed (with the in progress suffix) is created in the 
lock directory.
+Once the file is completely consumed, the corresponding lock file is deleted.
+
+**Recovery from failure**
+Periodically, the spout also records progress information wrt to how much of 
the file has been
+consumed in the lock file. In case of an crash of the spout instance (or force 
kill of topology) 
+another spout can take over the file and resume from the location recorded in 
the lock file.
+
+Certain error conditions can cause lock files to go stale. Basically the lock 
file exists, 
+but no spout actively owns and therefore will the file will not be deleted. 
Usually this indicates 
+that the corresponding input file has also not been completely processed. A 
configuration
+'hdfsspout.lock.timeout.sec' can be set to specify the duration of inactivity 
that a lock file
+should be considered stale. Stale lock files are candidates for automatic 
transfer of ownership to 
+another spout.
+
+**Lock on .lock Directory**
+The .lock directory contains another DIRLOCK file which is used to co-ordinate 
accesses to the 
+.lock dir itself among spout instances. A spout will try to create it when it 
needs access to
+the .lock directory and then delete it when done.  In case of a topology crash 
or force kill,
+if this file still exists, it should be deleted to allow the new topology 
instance to regain 
+full access to the  .lock directory and resume normal processing. 
+
 ## Usage
 
 The following example creates an HDFS spout that reads text files from HDFS 
path hdfs://localhost:54310/source.
 
 ```java
 // Instantiate spout
-HdfsSpout textReaderSpout = new HdfsSpout().withOutputFields("line");
-// HdfsSpout seqFileReaderSpout = new 
HdfsSpout().withOutputFields("key","value");
+HdfsSpout textReaderSpout = new 
HdfsSpout().withOutputFields(TextFileReader.defaultFields);
+// HdfsSpout seqFileReaderSpout = new 
HdfsSpout().withOutputFields(SequenceFileReader.defaultFields);
+
+// textReaderSpout.withConfigKey("custom.keyname"); // Optional. Not required 
normally unless you need to change the keyname use to provide hds settings. 
This keyname defaults to 'hdfs.config' 
 
 // Configure it
 Config conf = new Config();
@@ -34,21 +75,34 @@ builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM);
 StormSubmitter.submitTopologyWithProgressBar("topologyName", conf, 
builder.createTopology());
 ```
 
-## HDFS Spout Configuration Settings
-
-| Setting                  | Default     | Description |
-|--------------------------|-------------|-------------|
-|**hdfsspout.reader.type** |             | Indicates the reader for the file 
format. Set to 'seq' for reading sequence files or 'text' for text files. Set 
to a fully qualified class name if using a custom type (that implements 
interface org.apache.storm.hdfs.spout.FileReader)|
-|**hdfsspout.source.dir**  |             | HDFS location from where to read.  
E.g. hdfs://localhost:54310/inputfiles       |
-|**hdfsspout.archive.dir** |             | After a file is processed 
completely it will be moved to this directory. E.g. hdfs://localhost:54310/done|
-|**hdfsspout.badfiles.dir**|             | if there is an error parsing a 
file's contents, the file is moved to this location.  E.g. 
hdfs://localhost:54310/badfiles  |
-|hdfsspout.ignore.suffix   |   .ignore   | File names with this suffix in the 
in the hdfsspout.source.dir location will not be processed|
-|hdfsspout.lock.dir        | '.lock' subdirectory under hdfsspout.source.dir | 
Dir in which lock files will be created. Concurrent HDFS spout instances 
synchronize using *lock*. Before processing a file the spout instance creates a 
lock file in this directory with same name as input file and deletes this lock 
file after processing the file. Spout also periodically makes a note of its 
progress (wrt reading the input file) in the lock file so that another spout 
instance can resume progress on the same file if the spout dies for any reason.|
-|hdfsspout.commit.count    |    20000    | Record progress in the lock file 
after these many records are processed. If set to 0, this criterion will not be 
used. |
-|hdfsspout.commit.sec      |    10       | Record progress in the lock file 
after these many seconds have elapsed. Must be greater than 0 |
-|hdfsspout.max.outstanding |   10000     | Limits the number of unACKed tuples 
by pausing tuple generation (if ACKers are used in the topology) |
-|hdfsspout.lock.timeout.sec|  5 minutes  | Duration of inactivity after which 
a lock file is considered to be abandoned and ready for another spout to take 
ownership |
-|hdfsspout.clocks.insync   |    true     | Indicates whether clocks on the 
storm machines are in sync (using services like NTP)       |
+## Configuration Settings
+Class HdfsSpout provided following methods for configuration:
+
+`HdfsSpout withOutputFields(String... fields)` : This sets the names for the 
output fields. 
+The number of fields depends upon the reader being used. For convenience, 
built-in reader types 
+expose a static member called `defaultFields` that can be used for this. 
+ 
+ `HdfsSpout withConfigKey(String configKey)`
+Allows overriding the default key name (hdfs.config) with new name for 
specifying HDFS configs. Typicallly used
+to provide kerberos keytabs.
+
+Only settings mentioned in **bold** are required.
+
+| Setting                      | Default     | Description |
+|------------------------------|-------------|-------------|
+|**hdfsspout.reader.type**     |             | Indicates the reader for the 
file format. Set to 'seq' for reading sequence files or 'text' for text files. 
Set to a fully qualified class name if using a custom type (that implements 
interface org.apache.storm.hdfs.spout.FileReader)|
+|**hdfsspout.hdfs**            |             | HDFS URI. Example:  
hdfs://namenodehost:8020
+|**hdfsspout.source.dir**      |             | HDFS location from where to 
read.  E.g. /data/inputfiles  |
+|**hdfsspout.archive.dir**     |             | After a file is processed 
completely it will be moved to this directory. E.g. /data/done|
+|**hdfsspout.badfiles.dir**    |             | if there is an error parsing a 
file's contents, the file is moved to this location.  E.g. /data/badfiles  |
+|hdfsspout.lock.dir            | '.lock' subdirectory under 
hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS 
spout instances synchronize using *lock* files. Before processing a file the 
spout instance creates a lock file in this directory with same name as input 
file and deletes this lock file after processing the file. Spout also 
periodically makes a note of its progress (wrt reading the input file) in the 
lock file so that another spout instance can resume progress on the same file 
if the spout dies for any reason. When a toplogy is killed, if a .lock/DIRLOCK 
file is left behind it can be safely deleted to allow normal resumption of the 
topology on restart.|
+|hdfsspout.ignore.suffix       |   .ignore   | File names with this suffix in 
the in the hdfsspout.source.dir location will not be processed|
+|hdfsspout.commit.count        |    20000    | Record progress in the lock 
file after these many records are processed. If set to 0, this criterion will 
not be used. |
+|hdfsspout.commit.sec          |    10       | Record progress in the lock 
file after these many seconds have elapsed. Must be greater than 0 |
+|hdfsspout.max.outstanding     |   10000     | Limits the number of unACKed 
tuples by pausing tuple generation (if ACKers are used in the topology) |
+|hdfsspout.lock.timeout.sec    |  5 minutes  | Duration of inactivity after 
which a lock file is considered to be abandoned and ready for another spout to 
take ownership |
+|hdfsspout.clocks.insync       |    true     | Indicates whether clocks on the 
storm machines are in sync (using services like NTP)       |
+|hdfs.config (unless changed)  |             | Set it to a Map of Key/value 
pairs indicating the HDFS settigns to be used. For example, keytab and 
principle could be set using this. See section **Using keytabs on all worker 
hosts** under HDFS bolt below.| 
 
 
 # HDFS Bolt

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 00db8eb..8911b3c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -23,6 +23,7 @@ public class Configs {
   public static final String TEXT = "text";
   public static final String SEQ = "seq";
 
+  public static final String HDFS_URI = "hdfsspout.hdfs";                   // 
Required - HDFS name node
   public static final String SOURCE_DIR = "hdfsspout.source.dir";           // 
Required - dir from which to read files
   public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // 
Required - completed files will be moved here
   public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // 
Required - unparsable files will be moved here

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 0e1182f..cb8e015 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -81,8 +81,12 @@ public class DirLock {
 
   /** Release lock on dir by deleting the lock file */
   public void release() throws IOException {
-    fs.delete(lockFile, false);
-    log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
+    if(!fs.delete(lockFile, false)) {
+      log.error("Thread {} could not delete dir lock {} ", threadInfo(), 
lockFile);
+    }
+    else {
+      log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
+    }
   }
 
   /** if the lock on the directory is stale, take ownership */

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 0e172a9..65a49f3 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -20,6 +20,7 @@ package org.apache.storm.hdfs.spout;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -47,6 +48,7 @@ import backtype.storm.tuple.Fields;
 public class HdfsSpout extends BaseRichSpout {
 
   // user configurable
+  private String hdfsUri;            // required
   private String readerType;         // required
   private Fields outputFields;       // required
   private Path sourceDirPath;        // required
@@ -101,6 +103,13 @@ public class HdfsSpout extends BaseRichSpout {
     return this;
   }
 
+  /** set key name under which HDFS options are placed. (similar to HDFS bolt).
+   * default key name is 'hdfs.config' */
+  public HdfsSpout withConfigKey(String configKey) {
+    this.configKey = configKey;
+    return this;
+  }
+
   public Path getLockDirPath() {
     return lockDirPath;
   }
@@ -109,13 +118,6 @@ public class HdfsSpout extends BaseRichSpout {
     return collector;
   }
 
-  /** config key under which HDFS options are placed. (similar to HDFS bolt).
-   * default key name is 'hdfs.config' */
-  public HdfsSpout withConfigKey(String configKey){
-    this.configKey = configKey;
-    return this;
-  }
-
   public void nextTuple() {
     log.debug("Next Tuple {}", spoutId);
     // 1) First re-emit any previously failed tuples (from retryList)
@@ -214,7 +216,6 @@ public class HdfsSpout extends BaseRichSpout {
     commitTimer.schedule(timerTask, commitFrequencySec * 1000);
   }
 
-
   private static String getFileProgress(FileReader reader) {
     return reader.getFilePath() + " " + reader.getFileOffset();
   }
@@ -268,46 +269,52 @@ public class HdfsSpout extends BaseRichSpout {
     inflight.put(id, tuple);
   }
 
-  public void open(Map conf, TopologyContext context,  SpoutOutputCollector 
collector) {
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
+    log.info("Opening HDFS Spout {}", spoutId);
     this.conf = conf;
     this.commitTimer = new Timer();
     this.tracker = new ProgressTracker();
-    final String FILE_SYSTEM = "filesystem";
-    log.info("Opening HDFS Spout {}", spoutId);
+    this.hdfsConfig = new Configuration();
+
     this.collector = collector;
     this.hdfsConfig = new Configuration();
     this.tupleCounter = 0;
 
-    for( Object k : conf.keySet() ) {
-      String key = k.toString();
-      if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test 
only
-        String val = conf.get(key).toString();
-        log.info("Config setting : " + key + " = " + val);
-        this.hdfsConfig.set(key, val);
-      }
-      else
-        this.hdfs = (FileSystem) conf.get(key);
+    // Hdfs related settings
+    if( conf.containsKey(Configs.HDFS_URI)) {
+      this.hdfsUri = conf.get(Configs.HDFS_URI).toString();
+    } else {
+      throw new RuntimeException(Configs.HDFS_URI + " setting is required");
+    }
 
-      if(key.equalsIgnoreCase(Configs.READER_TYPE)) {
-        readerType = conf.get(key).toString();
-        checkValidReader(readerType);
-      }
+    try {
+      this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig);
+    } catch (IOException e) {
+      log.error("Unable to instantiate file system", e);
+      throw new RuntimeException("Unable to instantiate file system", e);
     }
 
-    // - Hdfs configs
-    this.hdfsConfig = new Configuration();
-    Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
-    if(map != null){
-      for(String key : map.keySet()){
-        this.hdfsConfig.set(key, String.valueOf(map.get(key)));
+
+    if ( conf.containsKey(configKey) ) {
+      Map<String, Object> map = (Map<String, Object>)conf.get(configKey);
+        if(map != null) {
+          for(String keyName : map.keySet()){
+            log.info("HDFS Config override : " + keyName + " = " + 
String.valueOf(map.get(keyName)));
+            this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName)));
+          }
+          try {
+            HdfsSecurityUtil.login(conf, hdfsConfig);
+          } catch (IOException e) {
+            log.error("HDFS Login failed ", e);
+            throw new RuntimeException(e);
+          }
+        } // if(map != null)
       }
-    }
 
-    try {
-      HdfsSecurityUtil.login(conf, hdfsConfig);
-    } catch (IOException e) {
-      log.error("Failed to open " + sourceDirPath);
-      throw new RuntimeException(e);
+    // Reader type config
+    if( conf.containsKey(Configs.READER_TYPE) ) {
+      readerType = conf.get(Configs.READER_TYPE).toString();
+      checkValidReader(readerType);
     }
 
     // -- source dir config
@@ -355,6 +362,8 @@ public class HdfsSpout extends BaseRichSpout {
     else
       this.ackEnabled = false;
 
+    log.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled");
+
     // -- commit frequency - count
     if( conf.get(Configs.COMMIT_FREQ_COUNT) != null )
       commitFrequencyCount = Integer.parseInt( 
conf.get(Configs.COMMIT_FREQ_COUNT).toString() );
@@ -420,8 +429,11 @@ public class HdfsSpout extends BaseRichSpout {
 
   @Override
   public void ack(Object msgId) {
-    if(!ackEnabled)
-      throw new IllegalStateException("Received an ACKs when ack-ing is 
disabled" );
+    if(!ackEnabled) {
+      log.debug("Ack() called but acker count = 0", msgId, spoutId);
+      return;
+    }
+    log.debug("Ack received for msg {} on spout {}", msgId, spoutId);
     MessageId id = (MessageId) msgId;
     inflight.remove(id);
     ++acksSinceLastCommit;
@@ -443,6 +455,7 @@ public class HdfsSpout extends BaseRichSpout {
 
   @Override
   public void fail(Object msgId) {
+    log.debug("Fail() called for msg {} on spout {}", msgId, spoutId);
     super.fail(msgId);
     HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, 
inflight.remove(msgId));
     retryList.add(item);

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 2187444..580993b 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -37,6 +37,7 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
         extends AbstractFileReader {
   private static final Logger log = LoggerFactory
           .getLogger(SequenceFileReader.class);
+  public static final String[] defaultFields = {"key", "value"};
   private static final int DEFAULT_BUFF_SIZE = 4096;
   public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 422ff69..641ac74 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -34,6 +34,7 @@ import java.util.Map;
 
 // Todo: Track file offsets instead of line number
 class TextFileReader extends AbstractFileReader {
+  public static final String[] defaultFields = {"line"};
   public static final String CHARSET = "hdfsspout.reader.charset";
   public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 3b07ba2..cdd4020 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -123,7 +123,7 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1");
 
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     runSpout(spout,"r11");
 
@@ -144,7 +144,7 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1");
     conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable acking
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     // consume file 1
     runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4");
@@ -167,8 +167,8 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it
     conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString());
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
-    HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
+    HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     // consume file 1 partially
     List<String> res = runSpout(spout, "r2");
@@ -214,8 +214,8 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it
     conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString());
-    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
-    HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ);
+    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ, 
SequenceFileReader.defaultFields);
+    HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ, 
SequenceFileReader.defaultFields);
 
     // consume file 1 partially
     List<String> res = runSpout(spout, "r2");
@@ -329,7 +329,7 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1");
     conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     // read few lines from file1 dont ack
     runSpout(spout, "r3");
@@ -405,7 +405,7 @@ public class TestHdfsSpout {
     createSeqFile(fs, file2, 5);
 
     Map conf = getDefaultConfig();
-    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
+    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ, 
SequenceFileReader.defaultFields);
 
     // consume both files
     List<String> res = runSpout(spout, "r11");
@@ -432,7 +432,7 @@ public class TestHdfsSpout {
 
     // 2) run spout
     Map conf = getDefaultConfig();
-    HdfsSpout spout = makeSpout(0, conf, 
MockTextFailingReader.class.getName());
+    HdfsSpout spout = makeSpout(0, conf, 
MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields);
     List<String> res = runSpout(spout, "r11");
     String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 
0]","[line 1]","[line 2]"};
     Assert.assertArrayEquals(expected, res.toArray());
@@ -453,7 +453,7 @@ public class TestHdfsSpout {
      Map conf = getDefaultConfig();
      conf.put(Configs.COMMIT_FREQ_COUNT, "1");
      conf.put(Configs.COMMIT_FREQ_SEC, "100"); // make it irrelvant
-     HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+     HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
      // 1) read initial lines in file, then check if lock exists
      List<String> res = runSpout(spout, "r5");
@@ -500,7 +500,7 @@ public class TestHdfsSpout {
     Map conf = getDefaultConfig();
     conf.put(Configs.COMMIT_FREQ_COUNT, "2");  // 1 lock log entry every 2 
tuples
     conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // make it irrelevant for this 
test
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     // 1) read 5 lines in file,
     runSpout(spout, "r5");
@@ -526,7 +526,7 @@ public class TestHdfsSpout {
     conf.put(Configs.COMMIT_FREQ_COUNT, "0");  // disable it
     conf.put(Configs.COMMIT_FREQ_SEC, "2"); // log every 2 sec
 
-    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, 
TextFileReader.defaultFields);
 
     // 1) read 5 lines in file
     runSpout(spout, "r5");
@@ -559,16 +559,17 @@ public class TestHdfsSpout {
     conf.put(Configs.SOURCE_DIR, source.toString());
     conf.put(Configs.ARCHIVE_DIR, archive.toString());
     conf.put(Configs.BAD_DIR, badfiles.toString());
-    conf.put("filesystem", fs);
+    conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString());
     return conf;
   }
 
 
-  private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) 
{
-    HdfsSpout spout = new HdfsSpout().withOutputFields("line");
+  private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType, 
String[] outputFields) {
+    HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields);
     MockCollector collector = new MockCollector();
     conf.put(Configs.READER_TYPE, readerType);
     spout.open(conf, new MockTopologyContext(spoutId), collector);
+    conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString());
     return spout;
   }
 
@@ -687,6 +688,7 @@ public class TestHdfsSpout {
   // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, 
thereafter
   // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each 
file read
   static class MockTextFailingReader extends TextFileReader {
+    public static final String[] defaultFields = {"line"};
     int readAttempts = 0;
 
     public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws 
IOException {

Reply via email to