Adding docs in README.md, Fixed bugs in instantiating Spout (found in Sys 
Testing), Added sample topology under hdfs-starter. Renamed logger from LOG to 
log


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a6fed4c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a6fed4c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a6fed4c6

Branch: refs/heads/1.x-branch
Commit: a6fed4c6f8fc973651d1e8e36de78e0a2f8b7c0d
Parents: e50b639
Author: Roshan Naik <[email protected]>
Authored: Tue Dec 29 01:42:58 2015 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |   5 +
 .../jvm/storm/starter/HdfsSpoutTopology.java    | 126 +++++++++++++++++
 external/storm-hdfs/README.md                   |  58 +++++++-
 .../storm/hdfs/spout/AbstractFileReader.java    |  13 +-
 .../org/apache/storm/hdfs/spout/Configs.java    |  16 +--
 .../org/apache/storm/hdfs/spout/FileOffset.java |   2 +-
 .../org/apache/storm/hdfs/spout/FileReader.java |  12 +-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 136 ++++++++++---------
 .../storm/hdfs/spout/SequenceFileReader.java    |  42 +-----
 .../apache/storm/hdfs/spout/TextFileReader.java |   8 +-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |   2 +-
 11 files changed, 282 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 8d6752d..1a7644a 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -135,6 +135,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-hdfs</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.8.2.1</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java 
b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
new file mode 100644
index 0000000..45a6aaf
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.spout.Configs;
+import org.apache.storm.hdfs.spout.HdfsSpout;
+
+import java.util.Map;
+
+
+public class HdfsSpoutTopology {
+
+  public static final String SPOUT_ID = "hdfsspout";
+  public static final String BOLT_ID = "hdfsbolt";
+
+  public static final int SPOUT_NUM = 4;
+  public static final int BOLT_NUM = 4;
+  public static final int WORKER_NUM = 4;
+
+
+  private static HdfsBolt makeHdfsBolt(String arg, String destinationDir) {
+    DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat()
+            .withPath(destinationDir)
+            .withExtension(".txt");
+    RecordFormat format = new DelimitedRecordFormat();
+    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(5.0f, 
TimedRotationPolicy.TimeUnit.MINUTES);
+
+    return new HdfsBolt()
+            .withConfigKey("hdfs.config")
+            .withFsUrl(arg)
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(format)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(new CountSyncPolicy(1000));
+  }
+
+  /** Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
+   *    args: sourceDir sourceArchiveDir badDir destinationDir
+   */
+  public static void main(String[] args) throws Exception {
+    // 0 - validate args
+    if (args.length < 6) {
+      System.err.println("Please check command line arguments.");
+      System.err.println("Usage :");
+      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName 
fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
+      System.err.println(" topologyName - topology name.");
+      System.err.println(" fileFormat -  Set to 'TEXT' for reading text files 
or 'SEQ' for sequence files.");
+      System.err.println(" sourceDir  - read files from this HDFS dir using 
HdfsSpout.");
+      System.err.println(" sourceArchiveDir - after a file in sourceDir is 
read completely, it is moved to this HDFS location.");
+      System.err.println(" badDir - files that cannot be read properly will be 
moved to this HDFS location.");
+      System.err.println(" destinationDir - write data out to this HDFS 
location using HDFS bolt.");
+
+      System.err.println();
+      System.exit(-1);
+    }
+
+    // 1 - parse cmd line args
+    String topologyName = args[0];
+    String fileFormat = args[1];
+    String sourceDir = args[2];
+    String sourceArchiveDir = args[3];
+    String badDir = args[4];
+    String destinationDir = args[5];
+
+    // 2 - create and configure spout and bolt
+    HdfsBolt bolt = makeHdfsBolt(args[0], destinationDir);
+    HdfsSpout spout = new HdfsSpout().withOutputFields("line");
+
+    Config conf = new Config();
+    conf.put(Configs.SOURCE_DIR, sourceDir);
+    conf.put(Configs.ARCHIVE_DIR, sourceArchiveDir);
+    conf.put(Configs.BAD_DIR, badDir);
+    conf.put(Configs.READER_TYPE, fileFormat);
+
+    // 3 - Create and configure topology
+    conf.setDebug(true);
+    conf.setNumWorkers(WORKER_NUM);
+    
conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
+
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(SPOUT_ID, spout, SPOUT_NUM);
+    builder.setBolt(BOLT_ID, bolt, BOLT_NUM).shuffleGrouping(SPOUT_ID);
+
+    // 4 - submit topology, wait for few min and terminate it
+    Map clusterConf = Utils.readStormConfig();
+    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
builder.createTopology());
+    Nimbus.Client client = 
NimbusClient.getConfiguredClient(clusterConf).getClient();
+
+    // 5 - Print metrics every 30 sec, kill topology after 5 min
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(30 * 1000);
+      FastWordCountTopology.printMetrics(client, topologyName);
+    }
+    FastWordCountTopology.kill(client, topologyName);
+  } // main
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 450a8f5..3a64ae6 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -1,9 +1,58 @@
 # Storm HDFS
 
 Storm components for interacting with HDFS file systems
+ - HDFS Bolt
+ - HDFS Spout
+ 
 
+# HDFS Spout
 
 ## Usage
+
+The following example creates an HDFS spout that reads text files from HDFS 
path hdfs://localhost:54310/source.
+
+```java
+// Instantiate spout
+HdfsSpout textReaderSpout = new HdfsSpout().withOutputFields("line");
+// HdfsSpout seqFileReaderSpout = new 
HdfsSpout().withOutputFields("key","value");
+
+// Configure it
+Config conf = new Config();
+conf.put(Configs.SOURCE_DIR, "hdfs://localhost:54310/source");
+conf.put(Configs.ARCHIVE_DIR, "hdfs://localhost:54310/done");
+conf.put(Configs.BAD_DIR, "hdfs://localhost:54310/badfiles");
+conf.put(Configs.READER_TYPE, "text"); // or 'seq' for sequence files
+
+// Create & configure topology
+TopologyBuilder builder = new TopologyBuilder();
+builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM);
+
+// Setup bolts and other topology configuration
+     ..snip..
+
+// Submit topology with config
+StormSubmitter.submitTopologyWithProgressBar("topologyName", conf, 
builder.createTopology());
+```
+
+## HDFS Spout Configuration Settings
+
+| Setting                  | Default     | Description |
+|--------------------------|-------------|-------------|
+|**hdfsspout.reader.type** |             | Indicates the reader for the file 
format. Set to 'seq' for reading sequence files or 'text' for text files. Set 
to a fully qualified class name if using a custom type (that implements 
interface org.apache.storm.hdfs.spout.FileReader)|
+|**hdfsspout.source.dir**  |             | HDFS location from where to read.  
E.g. hdfs://localhost:54310/inputfiles       |
+|**hdfsspout.archive.dir** |             | After a file is processed 
completely it will be moved to this directory. E.g. hdfs://localhost:54310/done|
+|**hdfsspout.badfiles.dir**|             | if there is an error parsing a 
file's contents, the file is moved to this location.  E.g. 
hdfs://localhost:54310/badfiles  |
+|hdfsspout.ignore.suffix   |   .ignore   | File names with this suffix in the 
in the hdfsspout.source.dir location will not be processed|
+|hdfsspout.lock.dir        | '.lock' subdirectory under hdfsspout.source.dir | 
Dir in which lock files will be created. Concurrent HDFS spout instances 
synchronize using *lock*. Before processing a file the spout instance creates a 
lock file in this directory with same name as input file and deletes this lock 
file after processing the file. Spout also periodically makes a note of its 
progress (wrt reading the input file) in the lock file so that another spout 
instance can resume progress on the same file if the spout dies for any reason.|
+|hdfsspout.commit.count    |    20000    | Record progress in the lock file 
after these many records are processed. If set to 0, this criterion will not be 
used. |
+|hdfsspout.commit.sec      |    10       | Record progress in the lock file 
after these many seconds have elapsed. Must be greater than 0 |
+|hdfsspout.max.outstanding |   10000     | Limits the number of unACKed tuples 
by pausing tuple generation (if ACKers are used in the topology) |
+|hdfsspout.lock.timeout.sec|  5 minutes  | Duration of inactivity after which 
a lock file is considered to be abandoned and ready for another spout to take 
ownership |
+|hdfsspout.clocks.insync   |    true     | Indicates whether clocks on the 
storm machines are in sync (using services like NTP)       |
+
+
+# HDFS Bolt
+## Usage
 The following example will write pipe("|")-delimited files to the HDFS path 
hdfs://localhost:54310/foo. After every
 1,000 tuples it will sync filesystem, making that data visible to other HDFS 
clients. It will rotate files when they
 reach 5 megabytes in size.
@@ -30,6 +79,7 @@ HdfsBolt bolt = new HdfsBolt()
         .withSyncPolicy(syncPolicy);
 ```
 
+
 ### Packaging a Topology
 When packaging your topology, it's important that you use the 
[maven-shade-plugin]() as opposed to the
 [maven-assembly-plugin]().
@@ -115,7 +165,7 @@ Hadoop client version incompatibilites can manifest as 
errors like:
 com.google.protobuf.InvalidProtocolBufferException: Protocol message contained 
an invalid tag (zero)
 ```
 
-## Customization
+## HDFS Bolt Customization
 
 ### Record Formats
 Record format can be controlled by providing an implementation of the 
`org.apache.storm.hdfs.format.RecordFormat`
@@ -236,7 +286,7 @@ If you are using Trident and sequence files you can do 
something like this:
 ```
 
 
-## Support for HDFS Sequence Files
+## HDFS Bolt Support for HDFS Sequence Files
 
 The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write 
storm data to HDFS sequence files:
 
@@ -277,7 +327,7 @@ public interface SequenceFormat extends Serializable {
 }
 ```
 
-## Support for Avro Files
+## HDFS Bolt Support for Avro Files
 
 The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to 
write Avro objects directly to HDFS:
  
@@ -310,7 +360,7 @@ An `org.apache.avro.Schema` object cannot be directly 
provided since it does not
 The AvroGenericRecordBolt expects to receive tuples containing an Avro 
GenericRecord that conforms to the provided
 schema.
 
-## Trident API
+## HDFS Bolt support for Trident API
 storm-hdfs also includes a Trident `state` implementation for writing data to 
HDFS, with an API that closely mirrors
 that of the bolts.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
index 6efea81..9996c6c 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -28,11 +28,10 @@ abstract class AbstractFileReader implements FileReader {
   private final Path file;
   private Fields fields;
 
-  public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) {
+  public AbstractFileReader(FileSystem fs, Path file) {
     if (fs == null || file == null)
       throw new IllegalArgumentException("file and filesystem args cannot be 
null");
     this.file = file;
-    this.fields = fieldNames;
   }
 
   @Override
@@ -42,16 +41,6 @@ abstract class AbstractFileReader implements FileReader {
 
 
   @Override
-  public Fields getOutputFields() {
-    return fields;
-  }
-
-  @Override
-  public void setFields(String... fieldNames) {
-    this.fields = new Fields(fieldNames);
-  }
-
-  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 93d775b..00db8eb 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -19,25 +19,25 @@
 package org.apache.storm.hdfs.spout;
 
 public class Configs {
-  public static final String READER_TYPE = "hdfsspout.reader.type";
+  public static final String READER_TYPE = "hdfsspout.reader.type";        // 
Required - chose the file type being consumed
   public static final String TEXT = "text";
   public static final String SEQ = "seq";
 
-  public static final String SOURCE_DIR = "hdfsspout.source.dir";           // 
dir from which to read files
-  public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // 
completed files will be moved here
-  public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // 
unpraseable files will be moved here
+  public static final String SOURCE_DIR = "hdfsspout.source.dir";           // 
Required - dir from which to read files
+  public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // 
Required - completed files will be moved here
+  public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // 
Required - unparsable files will be moved here
   public static final String LOCK_DIR = "hdfsspout.lock.dir";               // 
dir in which lock files will be created
   public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // 
commit after N records. 0 disables this.
   public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // 
commit after N secs. cannot be disabled.
-  public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate";
+  public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
   public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   // 
inactivity duration after which locks are considered candidates for being 
reassigned to another spout
   public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     // 
if clocks on machines in the Storm cluster are in sync
-  public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";     // 
filenames with this suffix will be ignored by the Spout
+  public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";     // 
filenames with this suffix in archive dir will be ignored by the Spout
 
   public static final String DEFAULT_LOCK_DIR = ".lock";
-  public static final int DEFAULT_COMMIT_FREQ_COUNT = 10000;
+  public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
   public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
-  public static final int DEFAULT_MAX_DUPLICATES = 100;
+  public static final int DEFAULT_MAX_OUTSTANDING = 10000;
   public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
   public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
index ea8c1e1..ad48779 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -32,5 +32,5 @@ package org.apache.storm.hdfs.spout;
 interface FileOffset extends Comparable<FileOffset>, Cloneable {
   /** tests if rhs == currOffset+1 */
   boolean isNextOffset(FileOffset rhs);
-  public FileOffset clone();
+  FileOffset clone();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 78284cf..1cb1f59 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -25,13 +25,13 @@ import java.io.IOException;
 import java.util.List;
 
 interface FileReader {
-  public Path getFilePath();
+  Path getFilePath();
 
   /**
    * A simple numeric value may not be sufficient for certain formats 
consequently
    * this is a String.
    */
-  public FileOffset getFileOffset();
+  FileOffset getFileOffset();
 
   /**
    * Get the next tuple from the file
@@ -39,11 +39,7 @@ interface FileReader {
    * @return null if no more data
    * @throws IOException
    */
-  public List<Object> next() throws IOException, ParseException;
+  List<Object> next() throws IOException, ParseException;
 
-  public Fields getOutputFields();
-
-  public void setFields(String... fieldNames);
-
-  public void close();
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index fdb48b4..0e172a9 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -46,20 +46,27 @@ import backtype.storm.tuple.Fields;
 
 public class HdfsSpout extends BaseRichSpout {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
-
-  private Path sourceDirPath;
-  private Path archiveDirPath;
-  private Path badFilesDirPath;
+  // user configurable
+  private String readerType;         // required
+  private Fields outputFields;       // required
+  private Path sourceDirPath;        // required
+  private Path archiveDirPath;       // required
+  private Path badFilesDirPath;      // required
   private Path lockDirPath;
 
   private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
   private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
-  private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES;
+  private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
   private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
   private boolean clocksInSync = true;
 
-  private ProgressTracker tracker = new ProgressTracker();
+  private String inprogress_suffix = ".inprogress";
+  private String ignoreSuffix = ".ignore";
+
+  // other members
+  private static final Logger log = LoggerFactory.getLogger(HdfsSpout.class);
+
+  private ProgressTracker tracker = null;
 
   private FileSystem hdfs;
   private FileReader reader;
@@ -68,11 +75,7 @@ public class HdfsSpout extends BaseRichSpout {
   HashMap<MessageId, List<Object> > inflight = new HashMap<>();
   LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new 
LinkedBlockingQueue<>();
 
-  private String inprogress_suffix = ".inprogress";
-  private String ignoreSuffix = ".ignore";
-
   private Configuration hdfsConfig;
-  private String readerType;
 
   private Map conf = null;
   private FileLock lock;
@@ -85,13 +88,18 @@ public class HdfsSpout extends BaseRichSpout {
   private boolean ackEnabled = false;
   private int acksSinceLastCommit = 0 ;
   private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
-  private final Timer commitTimer = new Timer();
+  private Timer commitTimer;
   private boolean fileReadCompletely = true;
 
-  private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs 
kerberos configs
+  private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs 
Kerberos configs
 
   public HdfsSpout() {
   }
+  /** Name of the output field names. Number of fields depends upon the reader 
type */
+  public HdfsSpout withOutputFields(String... fields) {
+    outputFields = new Fields(fields);
+    return this;
+  }
 
   public Path getLockDirPath() {
     return lockDirPath;
@@ -101,25 +109,27 @@ public class HdfsSpout extends BaseRichSpout {
     return collector;
   }
 
+  /** config key under which HDFS options are placed. (similar to HDFS bolt).
+   * default key name is 'hdfs.config' */
   public HdfsSpout withConfigKey(String configKey){
     this.configKey = configKey;
     return this;
   }
 
   public void nextTuple() {
-    LOG.debug("Next Tuple {}", spoutId);
+    log.debug("Next Tuple {}", spoutId);
     // 1) First re-emit any previously failed tuples (from retryList)
     if (!retryList.isEmpty()) {
-      LOG.debug("Sending from retry list");
+      log.debug("Sending from retry list");
       HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove();
       emitData(pair.getValue(), pair.getKey());
       return;
     }
 
-    if( ackEnabled  &&  tracker.size()>=maxDuplicates ) {
-      LOG.warn("Waiting for more ACKs before generating new tuples. " +
-               "Progress tracker size has reached limit {}, SpoutID {}"
-              , maxDuplicates, spoutId);
+    if( ackEnabled  &&  tracker.size()>= maxOutstanding) {
+      log.warn("Waiting for more ACKs before generating new tuples. " +
+              "Progress tracker size has reached limit {}, SpoutID {}"
+              , maxOutstanding, spoutId);
       // Don't emit anything .. allow configured spout wait strategy to kick in
       return;
     }
@@ -131,7 +141,7 @@ public class HdfsSpout extends BaseRichSpout {
         if (reader == null) {
           reader = pickNextFile();
           if (reader == null) {
-            LOG.debug("Currently no new files to process under : " + 
sourceDirPath);
+            log.debug("Currently no new files to process under : " + 
sourceDirPath);
             return;
           } else {
             fileReadCompletely=false;
@@ -162,11 +172,11 @@ public class HdfsSpout extends BaseRichSpout {
           }
         }
       } catch (IOException e) {
-        LOG.error("I/O Error processing at file location " + 
getFileProgress(reader), e);
+        log.error("I/O Error processing at file location " + 
getFileProgress(reader), e);
         // don't emit anything .. allow configured spout wait strategy to kick 
in
         return;
       } catch (ParseException e) {
-        LOG.error("Parsing error when processing at file location " + 
getFileProgress(reader) +
+        log.error("Parsing error when processing at file location " + 
getFileProgress(reader) +
                 ". Skipping remainder of file.", e);
         markFileAsBad(reader.getFilePath());
         // Note: We don't return from this method on ParseException to avoid 
triggering the
@@ -187,7 +197,7 @@ public class HdfsSpout extends BaseRichSpout {
         commitTimeElapsed.set(false);
         setupCommitElapseTimer();
       } catch (IOException e) {
-        LOG.error("Unable to commit progress Will retry later. Spout ID = " + 
spoutId, e);
+        log.error("Unable to commit progress Will retry later. Spout ID = " + 
spoutId, e);
       }
     }
   }
@@ -212,9 +222,9 @@ public class HdfsSpout extends BaseRichSpout {
   private void markFileAsDone(Path filePath) {
     try {
       Path newFile = renameCompletedFile(reader.getFilePath());
-      LOG.info("Completed processing {}. Spout Id = {} ", newFile, spoutId);
+      log.info("Completed processing {}. Spout Id = {} ", newFile, spoutId);
     } catch (IOException e) {
-      LOG.error("Unable to archive completed file" + filePath + " Spout ID " + 
spoutId, e);
+      log.error("Unable to archive completed file" + filePath + " Spout ID " + 
spoutId, e);
     }
     closeReaderAndResetTrackers();
   }
@@ -225,13 +235,13 @@ public class HdfsSpout extends BaseRichSpout {
     String originalName = new Path(fileNameMinusSuffix).getName();
     Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
 
-    LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= 
{}", originalName, newFile, tracker.getCommitPosition(), spoutId);
+    log.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= 
{}", originalName, newFile, tracker.getCommitPosition(), spoutId);
     try {
       if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning 
false or throwing exception
         throw new IOException("Move failed for bad file: " + file); // convert 
false ret value to exception
       }
     } catch (IOException e) {
-      LOG.warn("Error moving bad file: " + file + " to destination " + newFile 
+ " SpoutId =" + spoutId, e);
+      log.warn("Error moving bad file: " + file + " to destination " + newFile 
+ " SpoutId =" + spoutId, e);
     }
     closeReaderAndResetTrackers();
   }
@@ -245,23 +255,25 @@ public class HdfsSpout extends BaseRichSpout {
     reader = null;
     try {
       lock.release();
-      LOG.debug("Spout {} released FileLock. SpoutId = {}", 
lock.getLockFile(), spoutId);
+      log.debug("Spout {} released FileLock. SpoutId = {}", 
lock.getLockFile(), spoutId);
     } catch (IOException e) {
-      LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " 
SpoutId =" + spoutId, e);
+      log.error("Unable to delete lock file : " + this.lock.getLockFile() + " 
SpoutId =" + spoutId, e);
     }
     lock = null;
   }
 
   protected void emitData(List<Object> tuple, MessageId id) {
-    LOG.debug("Emitting - {}", id);
+    log.debug("Emitting - {}", id);
     this.collector.emit(tuple, id);
     inflight.put(id, tuple);
   }
 
   public void open(Map conf, TopologyContext context,  SpoutOutputCollector 
collector) {
     this.conf = conf;
+    this.commitTimer = new Timer();
+    this.tracker = new ProgressTracker();
     final String FILE_SYSTEM = "filesystem";
-    LOG.info("Opening HDFS Spout {}", spoutId);
+    log.info("Opening HDFS Spout {}", spoutId);
     this.collector = collector;
     this.hdfsConfig = new Configuration();
     this.tupleCounter = 0;
@@ -270,7 +282,7 @@ public class HdfsSpout extends BaseRichSpout {
       String key = k.toString();
       if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test 
only
         String val = conf.get(key).toString();
-        LOG.info("Config setting : " + key + " = " + val);
+        log.info("Config setting : " + key + " = " + val);
         this.hdfsConfig.set(key, val);
       }
       else
@@ -294,20 +306,20 @@ public class HdfsSpout extends BaseRichSpout {
     try {
       HdfsSecurityUtil.login(conf, hdfsConfig);
     } catch (IOException e) {
-      LOG.error("Failed to open " + sourceDirPath);
+      log.error("Failed to open " + sourceDirPath);
       throw new RuntimeException(e);
     }
 
     // -- source dir config
     if ( !conf.containsKey(Configs.SOURCE_DIR) ) {
-      LOG.error(Configs.SOURCE_DIR + " setting is required");
+      log.error(Configs.SOURCE_DIR + " setting is required");
       throw new RuntimeException(Configs.SOURCE_DIR + " setting is required");
     }
     this.sourceDirPath = new Path( conf.get(Configs.SOURCE_DIR).toString() );
 
     // -- archive dir config
     if ( !conf.containsKey(Configs.ARCHIVE_DIR) ) {
-      LOG.error(Configs.ARCHIVE_DIR + " setting is required");
+      log.error(Configs.ARCHIVE_DIR + " setting is required");
       throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
     }
     this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() );
@@ -315,14 +327,14 @@ public class HdfsSpout extends BaseRichSpout {
 
     // -- bad files dir config
     if ( !conf.containsKey(Configs.BAD_DIR) ) {
-      LOG.error(Configs.BAD_DIR + " setting is required");
+      log.error(Configs.BAD_DIR + " setting is required");
       throw new RuntimeException(Configs.BAD_DIR + " setting is required");
     }
 
     this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString());
     validateOrMakeDir(hdfs, badFilesDirPath, "bad files");
 
-            // -- ignore filename suffix
+    // -- ignore file names config
     if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) {
       this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString();
     }
@@ -348,12 +360,15 @@ public class HdfsSpout extends BaseRichSpout {
       commitFrequencyCount = Integer.parseInt( 
conf.get(Configs.COMMIT_FREQ_COUNT).toString() );
 
     // -- commit frequency - seconds
-    if( conf.get(Configs.COMMIT_FREQ_SEC) != null )
-      commitFrequencySec = Integer.parseInt( 
conf.get(Configs.COMMIT_FREQ_SEC).toString() );
+    if( conf.get(Configs.COMMIT_FREQ_SEC) != null ) {
+      commitFrequencySec = 
Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString());
+      if(commitFrequencySec<=0)
+        throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting must be 
greater than 0");
+    }
 
     // -- max duplicate
-    if( conf.get(Configs.MAX_DUPLICATE) !=null )
-      maxDuplicates = Integer.parseInt( 
conf.get(Configs.MAX_DUPLICATE).toString() );
+    if( conf.get(Configs.MAX_OUTSTANDING) !=null )
+      maxOutstanding = Integer.parseInt( 
conf.get(Configs.MAX_OUTSTANDING).toString() );
 
     // -- clocks in sync
     if( conf.get(Configs.CLOCKS_INSYNC) !=null )
@@ -370,15 +385,15 @@ public class HdfsSpout extends BaseRichSpout {
     try {
       if(fs.exists(dir)) {
         if(! fs.isDirectory(dir) ) {
-          LOG.error(dirDescription + " directory is a file, not a dir. " + 
dir);
+          log.error(dirDescription + " directory is a file, not a dir. " + 
dir);
           throw new RuntimeException(dirDescription + " directory is a file, 
not a dir. " + dir);
         }
       } else if(! fs.mkdirs(dir) ) {
-        LOG.error("Unable to create " + dirDescription + " directory " + dir);
+        log.error("Unable to create " + dirDescription + " directory " + dir);
         throw new RuntimeException("Unable to create " + dirDescription + " 
directory " + dir);
       }
     } catch (IOException e) {
-      LOG.error("Unable to create " + dirDescription + " directory " + dir, e);
+      log.error("Unable to create " + dirDescription + " directory " + dir, e);
       throw new RuntimeException("Unable to create " + dirDescription + " 
directory " + dir, e);
     }
   }
@@ -395,10 +410,10 @@ public class HdfsSpout extends BaseRichSpout {
       classType.getConstructor(FileSystem.class, Path.class, Map.class);
       return;
     } catch (ClassNotFoundException e) {
-      LOG.error(readerType + " not found in classpath.", e);
+      log.error(readerType + " not found in classpath.", e);
       throw new IllegalArgumentException(readerType + " not found in 
classpath.", e);
     } catch (NoSuchMethodException e) {
-      LOG.error(readerType + " is missing the expected constructor for 
Readers.", e);
+      log.error(readerType + " is missing the expected constructor for 
Readers.", e);
       throw new IllegalArgumentException(readerType + " is missing the 
expected constuctor for Readers.");
     }
   }
@@ -438,10 +453,10 @@ public class HdfsSpout extends BaseRichSpout {
       // 1) If there are any abandoned files, pick oldest one
       lock = getOldestExpiredLock();
       if (lock != null) {
-        LOG.debug("Spout {} now took over ownership of abandoned FileLock {}" 
, spoutId, lock.getLockFile());
+        log.debug("Spout {} now took over ownership of abandoned FileLock {}", 
spoutId, lock.getLockFile());
         Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
         String resumeFromOffset = lock.getLastLogEntry().fileOffset;
-        LOG.info("Resuming processing of abandoned file : {}", file);
+        log.info("Resuming processing of abandoned file : {}", file);
         return createFileReader(file, resumeFromOffset);
       }
 
@@ -456,17 +471,17 @@ public class HdfsSpout extends BaseRichSpout {
 
         lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
         if( lock==null ) {
-          LOG.debug("Unable to get FileLock, so skipping file: {}", file);
+          log.debug("Unable to get FileLock, so skipping file: {}", file);
           continue; // could not lock, so try another file.
         }
-        LOG.info("Processing : {} ", file);
+        log.info("Processing : {} ", file);
         Path newFile = renameSelectedFile(file);
         return createFileReader(newFile);
       }
 
       return null;
     } catch (IOException e) {
-      LOG.error("Unable to select next file for consumption " + sourceDirPath, 
e);
+      log.error("Unable to select next file for consumption " + sourceDirPath, 
e);
       return null;
     }
   }
@@ -483,12 +498,12 @@ public class HdfsSpout extends BaseRichSpout {
     if (dirlock == null) {
       dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, 
lockTimeoutSec);
       if (dirlock == null) {
-        LOG.debug("Spout {} could not take over ownership of DirLock for {}" , 
spoutId, lockDirPath);
+        log.debug("Spout {} could not take over ownership of DirLock for {}", 
spoutId, lockDirPath);
         return null;
       }
-      LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}" 
, spoutId, lockDirPath);
+      log.debug("Spout {} now took over ownership of abandoned DirLock for 
{}", spoutId, lockDirPath);
     } else {
-      LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath);
+      log.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath);
     }
 
     try {
@@ -520,7 +535,7 @@ public class HdfsSpout extends BaseRichSpout {
       }
     } finally {
       dirlock.release();
-      LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), 
spoutId);
+      log.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), 
spoutId);
     }
   }
 
@@ -546,7 +561,7 @@ public class HdfsSpout extends BaseRichSpout {
       Constructor<?> constructor = clsType.getConstructor(FileSystem.class, 
Path.class, Map.class);
       return (FileReader) constructor.newInstance(this.hdfs, file, conf);
     } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+      log.error(e.getMessage(), e);
       throw new RuntimeException("Unable to instantiate " + readerType, e);
     }
   }
@@ -571,7 +586,7 @@ public class HdfsSpout extends BaseRichSpout {
       Constructor<?> constructor = clsType.getConstructor(FileSystem.class, 
Path.class, Map.class, String.class);
       return (FileReader) constructor.newInstance(this.hdfs, file, conf, 
offset);
     } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+      log.error(e.getMessage(), e);
       throw new RuntimeException("Unable to instantiate " + readerType, e);
     }
   }
@@ -609,17 +624,16 @@ public class HdfsSpout extends BaseRichSpout {
     String newName = new Path(fileNameMinusSuffix).getName();
 
     Path  newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
-    LOG.debug("Renaming complete file to {} ", newFile);
-    LOG.info("Completed file {}", fileNameMinusSuffix );
+    log.info("Completed consuming file {}", fileNameMinusSuffix);
     if (!hdfs.rename(file, newFile) ) {
       throw new IOException("Rename failed for file: " + file);
     }
+    log.debug("Renamed completed file {} to {} ", file, newFile);
     return newFile;
   }
 
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    Fields fields = reader.getOutputFields();
-    declarer.declare(fields);
+    declarer.declare(outputFields);
   }
 
   static class MessageId implements  Comparable<MessageId> {

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 308d1c6..2187444 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -33,10 +33,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-// Todo: Track file offsets instead of line number
 public class SequenceFileReader<Key extends Writable,Value extends Writable>
         extends AbstractFileReader {
-  private static final Logger LOG = LoggerFactory
+  private static final Logger log = LoggerFactory
           .getLogger(SequenceFileReader.class);
   private static final int DEFAULT_BUFF_SIZE = 4096;
   public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
@@ -45,12 +44,6 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
 
   private final SequenceFileReader.Offset offset;
 
-  private static final String DEFAULT_KEYNAME = "key";
-  private static final String DEFAULT_VALNAME = "value";
-
-  private String keyName;
-  private String valueName;
-
 
   private final Key key;
   private final Value value;
@@ -58,9 +51,7 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
 
   public SequenceFileReader(FileSystem fs, Path file, Map conf)
           throws IOException {
-    super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
-    this.keyName = DEFAULT_KEYNAME;
-    this.valueName = DEFAULT_VALNAME;
+    super(fs, file);
     int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
     this.reader = new SequenceFile.Reader(fs.getConf(),  
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
     this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf() );
@@ -70,9 +61,7 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
 
   public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset)
           throws IOException {
-    super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
-    this.keyName = DEFAULT_KEYNAME;
-    this.valueName = DEFAULT_VALNAME;
+    super(fs, file);
     int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
     this.offset = new SequenceFileReader.Offset(offset);
     this.reader = new SequenceFile.Reader(fs.getConf(),  
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
@@ -88,29 +77,6 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
     }
   }
 
-  public String getKeyName() {
-    return keyName;
-  }
-
-  public void setKeyName(String name) {
-    if (name == null)
-      throw new IllegalArgumentException("keyName cannot be null");
-    this.keyName = name;
-    setFields(keyName, valueName);
-
-  }
-
-  public String getValueName() {
-    return valueName;
-  }
-
-  public void setValueName(String name) {
-    if (name == null)
-      throw new IllegalArgumentException("valueName cannot be null");
-    this.valueName = name;
-    setFields(keyName, valueName);
-  }
-
   public List<Object> next() throws IOException, ParseException {
     if( reader.next(key, value) ) {
       ArrayList<Object> result = new ArrayList<Object>(2);
@@ -126,7 +92,7 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
     try {
       reader.close();
     } catch (IOException e) {
-      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+      log.warn("Ignoring error when closing file " + getFilePath(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index fdea42a..422ff69 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -37,12 +37,10 @@ class TextFileReader extends AbstractFileReader {
   public static final String CHARSET = "hdfsspout.reader.charset";
   public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
 
-  public static final String DEFAULT_FIELD_NAME = "line";
-
   private static final int DEFAULT_BUFF_SIZE = 4096;
 
   private BufferedReader reader;
-  private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+  private final Logger log = LoggerFactory.getLogger(TextFileReader.class);
   private TextFileReader.Offset offset;
 
   public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException 
{
@@ -55,7 +53,7 @@ class TextFileReader extends AbstractFileReader {
 
   private TextFileReader(FileSystem fs, Path file, Map conf, 
TextFileReader.Offset startOffset)
           throws IOException {
-    super(fs, file, new Fields(DEFAULT_FIELD_NAME));
+    super(fs, file);
     offset = startOffset;
     FSDataInputStream in = fs.open(file);
 
@@ -102,7 +100,7 @@ class TextFileReader extends AbstractFileReader {
     try {
       reader.close();
     } catch (IOException e) {
-      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+      log.warn("Ignoring error when closing file " + getFilePath(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 203a63b..3b07ba2 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -565,7 +565,7 @@ public class TestHdfsSpout {
 
 
   private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) 
{
-    HdfsSpout spout = new HdfsSpout();
+    HdfsSpout spout = new HdfsSpout().withOutputFields("line");
     MockCollector collector = new MockCollector();
     conf.put(Configs.READER_TYPE, readerType);
     spout.open(conf, new MockTopologyContext(spoutId), collector);

Reply via email to