Added config hdfsspout.hdfs. Improved logging, bug fix in how hdfs specific settings are used, added storm-starter topology HdfsSpoutToplogy, outputFields now need to be specified on spout. Improved docs, updated UTs.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac1322fb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac1322fb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac1322fb Branch: refs/heads/1.x-branch Commit: ac1322fbe7bf8bc3dfb614118312cee37c75b44e Parents: a6fed4c Author: Roshan Naik <[email protected]> Authored: Wed Dec 30 18:37:40 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../jvm/storm/starter/HdfsSpoutTopology.java | 75 ++++++++++------ external/storm-hdfs/README.md | 92 ++++++++++++++++---- .../org/apache/storm/hdfs/spout/Configs.java | 1 + .../org/apache/storm/hdfs/spout/DirLock.java | 8 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 89 +++++++++++-------- .../storm/hdfs/spout/SequenceFileReader.java | 1 + .../apache/storm/hdfs/spout/TextFileReader.java | 1 + .../apache/storm/hdfs/spout/TestHdfsSpout.java | 32 +++---- 8 files changed, 198 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java index 45a6aaf..3837943 100644 --- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java @@ -33,6 +33,12 @@ import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.spout.Configs; import org.apache.storm.hdfs.spout.HdfsSpout; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.topology.*; +import backtype.storm.tuple.*; +import backtype.storm.task.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @@ -42,37 +48,47 @@ public class HdfsSpoutTopology { public static final String SPOUT_ID = "hdfsspout"; public static final String BOLT_ID = "hdfsbolt"; - public static final int SPOUT_NUM = 4; - public static final int BOLT_NUM = 4; - public static final int WORKER_NUM = 4; + public static final int SPOUT_NUM = 1; + public static final int BOLT_NUM = 1; + public static final int WORKER_NUM = 1; + public static class ConstBolt extends BaseRichBolt { + private static final long serialVersionUID = -5313598399155365865L; + public static final String FIELDS = "message"; + private OutputCollector collector; + private static final Logger log = LoggerFactory.getLogger(ConstBolt.class); - private static HdfsBolt makeHdfsBolt(String arg, String destinationDir) { - DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat() - .withPath(destinationDir) - .withExtension(".txt"); - RecordFormat format = new DelimitedRecordFormat(); - FileRotationPolicy rotationPolicy = new TimedRotationPolicy(5.0f, TimedRotationPolicy.TimeUnit.MINUTES); + public ConstBolt() { + } + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } - return new HdfsBolt() - .withConfigKey("hdfs.config") - .withFsUrl(arg) - .withFileNameFormat(fileNameFormat) - .withRecordFormat(format) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(new CountSyncPolicy(1000)); - } + @Override + public void execute(Tuple tuple) { + log.info("Received tuple : {}", tuple.getValue(0)); + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(FIELDS)); + } + } // class /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming * args: sourceDir sourceArchiveDir badDir destinationDir */ public static void main(String[] args) throws Exception { // 0 - validate args - if (args.length < 6) { + if (args.length < 7) { System.err.println("Please check command line arguments."); System.err.println("Usage :"); - System.err.println(HdfsSpoutTopology.class.toString() + " topologyName fileFormat sourceDir sourceArchiveDir badDir destinationDir."); + System.err.println(HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir."); System.err.println(" topologyName - topology name."); + System.err.println(" hdfsUri - hdfs name node URI"); System.err.println(" fileFormat - Set to 'TEXT' for reading text files or 'SEQ' for sequence files."); System.err.println(" sourceDir - read files from this HDFS dir using HdfsSpout."); System.err.println(" sourceArchiveDir - after a file in sourceDir is read completely, it is moved to this HDFS location."); @@ -85,14 +101,15 @@ public class HdfsSpoutTopology { // 1 - parse cmd line args String topologyName = args[0]; - String fileFormat = args[1]; - String sourceDir = args[2]; - String sourceArchiveDir = args[3]; - String badDir = args[4]; - String destinationDir = args[5]; + String hdfsUri = args[1]; + String fileFormat = args[2]; + String sourceDir = args[3]; + String sourceArchiveDir = args[4]; + String badDir = args[5]; + String destinationDir = args[6]; // 2 - create and configure spout and bolt - HdfsBolt bolt = makeHdfsBolt(args[0], destinationDir); + ConstBolt bolt = new ConstBolt(); HdfsSpout spout = new HdfsSpout().withOutputFields("line"); Config conf = new Config(); @@ -100,6 +117,10 @@ public class HdfsSpoutTopology { conf.put(Configs.ARCHIVE_DIR, sourceArchiveDir); conf.put(Configs.BAD_DIR, badDir); conf.put(Configs.READER_TYPE, fileFormat); + conf.put(Configs.HDFS_URI, hdfsUri); + conf.setDebug(true); + conf.setNumWorkers(1); + conf.setMaxTaskParallelism(1); // 3 - Create and configure topology conf.setDebug(true); @@ -115,8 +136,8 @@ public class HdfsSpoutTopology { StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - // 5 - Print metrics every 30 sec, kill topology after 5 min - for (int i = 0; i < 10; i++) { + // 5 - Print metrics every 30 sec, kill topology after 20 min + for (int i = 0; i < 40; i++) { Thread.sleep(30 * 1000); FastWordCountTopology.printMetrics(client, topologyName); } http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/README.md ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index 3a64ae6..237fc8c 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -1,20 +1,61 @@ # Storm HDFS Storm components for interacting with HDFS file systems - - HDFS Bolt - HDFS Spout - + - HDFS Bolt # HDFS Spout +Hdfs spout is intended to allow feeding data into Storm from a HDFS directory. +It will actively monitor the directory to consume any new files that appear in the directory. + +**Impt**: Hdfs spout assumes that the files being made visible to it in the monitored directory +are NOT actively being written to. Only after a files is completely written should it be made +visible to the spout. This can be achieved by either writing the files out to another directory +and once completely written, move it to the monitored directory. Alternatively the file +can be created with a '.ignore' suffix in the monitored directory and after data is completely +written, rename it without the suffix. File names with a '.ignore' suffix are ignored +by the spout. + +When the spout is actively consuming a file, ite renames the file with a '.inprogress' suffix. +After consuming all the contents in the file, the file will be moved to a configurable *done* +directory and the '.inprogress' suffix will be dropped. + +**Concurrency** If multiple spout instances are used in the topology, each instance will consume +a different file. Synchronization among spout instances is done using a lock files created in +(by default) a '.lock' subdirectory under the monitored directory. A file with the same name +as the file being consumed (with the in progress suffix) is created in the lock directory. +Once the file is completely consumed, the corresponding lock file is deleted. + +**Recovery from failure** +Periodically, the spout also records progress information wrt to how much of the file has been +consumed in the lock file. In case of an crash of the spout instance (or force kill of topology) +another spout can take over the file and resume from the location recorded in the lock file. + +Certain error conditions can cause lock files to go stale. Basically the lock file exists, +but no spout actively owns and therefore will the file will not be deleted. Usually this indicates +that the corresponding input file has also not been completely processed. A configuration +'hdfsspout.lock.timeout.sec' can be set to specify the duration of inactivity that a lock file +should be considered stale. Stale lock files are candidates for automatic transfer of ownership to +another spout. + +**Lock on .lock Directory** +The .lock directory contains another DIRLOCK file which is used to co-ordinate accesses to the +.lock dir itself among spout instances. A spout will try to create it when it needs access to +the .lock directory and then delete it when done. In case of a topology crash or force kill, +if this file still exists, it should be deleted to allow the new topology instance to regain +full access to the .lock directory and resume normal processing. + ## Usage The following example creates an HDFS spout that reads text files from HDFS path hdfs://localhost:54310/source. ```java // Instantiate spout -HdfsSpout textReaderSpout = new HdfsSpout().withOutputFields("line"); -// HdfsSpout seqFileReaderSpout = new HdfsSpout().withOutputFields("key","value"); +HdfsSpout textReaderSpout = new HdfsSpout().withOutputFields(TextFileReader.defaultFields); +// HdfsSpout seqFileReaderSpout = new HdfsSpout().withOutputFields(SequenceFileReader.defaultFields); + +// textReaderSpout.withConfigKey("custom.keyname"); // Optional. Not required normally unless you need to change the keyname use to provide hds settings. This keyname defaults to 'hdfs.config' // Configure it Config conf = new Config(); @@ -34,21 +75,34 @@ builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM); StormSubmitter.submitTopologyWithProgressBar("topologyName", conf, builder.createTopology()); ``` -## HDFS Spout Configuration Settings - -| Setting | Default | Description | -|--------------------------|-------------|-------------| -|**hdfsspout.reader.type** | | Indicates the reader for the file format. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom type (that implements interface org.apache.storm.hdfs.spout.FileReader)| -|**hdfsspout.source.dir** | | HDFS location from where to read. E.g. hdfs://localhost:54310/inputfiles | -|**hdfsspout.archive.dir** | | After a file is processed completely it will be moved to this directory. E.g. hdfs://localhost:54310/done| -|**hdfsspout.badfiles.dir**| | if there is an error parsing a file's contents, the file is moved to this location. E.g. hdfs://localhost:54310/badfiles | -|hdfsspout.ignore.suffix | .ignore | File names with this suffix in the in the hdfsspout.source.dir location will not be processed| -|hdfsspout.lock.dir | '.lock' subdirectory under hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS spout instances synchronize using *lock*. Before processing a file the spout instance creates a lock file in this directory with same name as input file and deletes this lock file after processing the file. Spout also periodically makes a note of its progress (wrt reading the input file) in the lock file so that another spout instance can resume progress on the same file if the spout dies for any reason.| -|hdfsspout.commit.count | 20000 | Record progress in the lock file after these many records are processed. If set to 0, this criterion will not be used. | -|hdfsspout.commit.sec | 10 | Record progress in the lock file after these many seconds have elapsed. Must be greater than 0 | -|hdfsspout.max.outstanding | 10000 | Limits the number of unACKed tuples by pausing tuple generation (if ACKers are used in the topology) | -|hdfsspout.lock.timeout.sec| 5 minutes | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership | -|hdfsspout.clocks.insync | true | Indicates whether clocks on the storm machines are in sync (using services like NTP) | +## Configuration Settings +Class HdfsSpout provided following methods for configuration: + +`HdfsSpout withOutputFields(String... fields)` : This sets the names for the output fields. +The number of fields depends upon the reader being used. For convenience, built-in reader types +expose a static member called `defaultFields` that can be used for this. + + `HdfsSpout withConfigKey(String configKey)` +Allows overriding the default key name (hdfs.config) with new name for specifying HDFS configs. Typicallly used +to provide kerberos keytabs. + +Only settings mentioned in **bold** are required. + +| Setting | Default | Description | +|------------------------------|-------------|-------------| +|**hdfsspout.reader.type** | | Indicates the reader for the file format. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom type (that implements interface org.apache.storm.hdfs.spout.FileReader)| +|**hdfsspout.hdfs** | | HDFS URI. Example: hdfs://namenodehost:8020 +|**hdfsspout.source.dir** | | HDFS location from where to read. E.g. /data/inputfiles | +|**hdfsspout.archive.dir** | | After a file is processed completely it will be moved to this directory. E.g. /data/done| +|**hdfsspout.badfiles.dir** | | if there is an error parsing a file's contents, the file is moved to this location. E.g. /data/badfiles | +|hdfsspout.lock.dir | '.lock' subdirectory under hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS spout instances synchronize using *lock* files. Before processing a file the spout instance creates a lock file in this directory with same name as input file and deletes this lock file after processing the file. Spout also periodically makes a note of its progress (wrt reading the input file) in the lock file so that another spout instance can resume progress on the same file if the spout dies for any reason. When a toplogy is killed, if a .lock/DIRLOCK file is left behind it can be safely deleted to allow normal resumption of the topology on restart.| +|hdfsspout.ignore.suffix | .ignore | File names with this suffix in the in the hdfsspout.source.dir location will not be processed| +|hdfsspout.commit.count | 20000 | Record progress in the lock file after these many records are processed. If set to 0, this criterion will not be used. | +|hdfsspout.commit.sec | 10 | Record progress in the lock file after these many seconds have elapsed. Must be greater than 0 | +|hdfsspout.max.outstanding | 10000 | Limits the number of unACKed tuples by pausing tuple generation (if ACKers are used in the topology) | +|hdfsspout.lock.timeout.sec | 5 minutes | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership | +|hdfsspout.clocks.insync | true | Indicates whether clocks on the storm machines are in sync (using services like NTP) | +|hdfs.config (unless changed) | | Set it to a Map of Key/value pairs indicating the HDFS settigns to be used. For example, keytab and principle could be set using this. See section **Using keytabs on all worker hosts** under HDFS bolt below.| # HDFS Bolt http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java index 00db8eb..8911b3c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -23,6 +23,7 @@ public class Configs { public static final String TEXT = "text"; public static final String SEQ = "seq"; + public static final String HDFS_URI = "hdfsspout.hdfs"; // Required - HDFS name node public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index 0e1182f..cb8e015 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -81,8 +81,12 @@ public class DirLock { /** Release lock on dir by deleting the lock file */ public void release() throws IOException { - fs.delete(lockFile, false); - log.info("Thread {} released dir lock {} ", threadInfo(), lockFile); + if(!fs.delete(lockFile, false)) { + log.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile); + } + else { + log.info("Thread {} released dir lock {} ", threadInfo(), lockFile); + } } /** if the lock on the directory is stale, take ownership */ http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 0e172a9..65a49f3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -20,6 +20,7 @@ package org.apache.storm.hdfs.spout; import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -47,6 +48,7 @@ import backtype.storm.tuple.Fields; public class HdfsSpout extends BaseRichSpout { // user configurable + private String hdfsUri; // required private String readerType; // required private Fields outputFields; // required private Path sourceDirPath; // required @@ -101,6 +103,13 @@ public class HdfsSpout extends BaseRichSpout { return this; } + /** set key name under which HDFS options are placed. (similar to HDFS bolt). + * default key name is 'hdfs.config' */ + public HdfsSpout withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + public Path getLockDirPath() { return lockDirPath; } @@ -109,13 +118,6 @@ public class HdfsSpout extends BaseRichSpout { return collector; } - /** config key under which HDFS options are placed. (similar to HDFS bolt). - * default key name is 'hdfs.config' */ - public HdfsSpout withConfigKey(String configKey){ - this.configKey = configKey; - return this; - } - public void nextTuple() { log.debug("Next Tuple {}", spoutId); // 1) First re-emit any previously failed tuples (from retryList) @@ -214,7 +216,6 @@ public class HdfsSpout extends BaseRichSpout { commitTimer.schedule(timerTask, commitFrequencySec * 1000); } - private static String getFileProgress(FileReader reader) { return reader.getFilePath() + " " + reader.getFileOffset(); } @@ -268,46 +269,52 @@ public class HdfsSpout extends BaseRichSpout { inflight.put(id, tuple); } - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + log.info("Opening HDFS Spout {}", spoutId); this.conf = conf; this.commitTimer = new Timer(); this.tracker = new ProgressTracker(); - final String FILE_SYSTEM = "filesystem"; - log.info("Opening HDFS Spout {}", spoutId); + this.hdfsConfig = new Configuration(); + this.collector = collector; this.hdfsConfig = new Configuration(); this.tupleCounter = 0; - for( Object k : conf.keySet() ) { - String key = k.toString(); - if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test only - String val = conf.get(key).toString(); - log.info("Config setting : " + key + " = " + val); - this.hdfsConfig.set(key, val); - } - else - this.hdfs = (FileSystem) conf.get(key); + // Hdfs related settings + if( conf.containsKey(Configs.HDFS_URI)) { + this.hdfsUri = conf.get(Configs.HDFS_URI).toString(); + } else { + throw new RuntimeException(Configs.HDFS_URI + " setting is required"); + } - if(key.equalsIgnoreCase(Configs.READER_TYPE)) { - readerType = conf.get(key).toString(); - checkValidReader(readerType); - } + try { + this.hdfs = FileSystem.get(URI.create(hdfsUri), hdfsConfig); + } catch (IOException e) { + log.error("Unable to instantiate file system", e); + throw new RuntimeException("Unable to instantiate file system", e); } - // - Hdfs configs - this.hdfsConfig = new Configuration(); - Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey); - if(map != null){ - for(String key : map.keySet()){ - this.hdfsConfig.set(key, String.valueOf(map.get(key))); + + if ( conf.containsKey(configKey) ) { + Map<String, Object> map = (Map<String, Object>)conf.get(configKey); + if(map != null) { + for(String keyName : map.keySet()){ + log.info("HDFS Config override : " + keyName + " = " + String.valueOf(map.get(keyName))); + this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName))); + } + try { + HdfsSecurityUtil.login(conf, hdfsConfig); + } catch (IOException e) { + log.error("HDFS Login failed ", e); + throw new RuntimeException(e); + } + } // if(map != null) } - } - try { - HdfsSecurityUtil.login(conf, hdfsConfig); - } catch (IOException e) { - log.error("Failed to open " + sourceDirPath); - throw new RuntimeException(e); + // Reader type config + if( conf.containsKey(Configs.READER_TYPE) ) { + readerType = conf.get(Configs.READER_TYPE).toString(); + checkValidReader(readerType); } // -- source dir config @@ -355,6 +362,8 @@ public class HdfsSpout extends BaseRichSpout { else this.ackEnabled = false; + log.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled"); + // -- commit frequency - count if( conf.get(Configs.COMMIT_FREQ_COUNT) != null ) commitFrequencyCount = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_COUNT).toString() ); @@ -420,8 +429,11 @@ public class HdfsSpout extends BaseRichSpout { @Override public void ack(Object msgId) { - if(!ackEnabled) - throw new IllegalStateException("Received an ACKs when ack-ing is disabled" ); + if(!ackEnabled) { + log.debug("Ack() called but acker count = 0", msgId, spoutId); + return; + } + log.debug("Ack received for msg {} on spout {}", msgId, spoutId); MessageId id = (MessageId) msgId; inflight.remove(id); ++acksSinceLastCommit; @@ -443,6 +455,7 @@ public class HdfsSpout extends BaseRichSpout { @Override public void fail(Object msgId) { + log.debug("Fail() called for msg {} on spout {}", msgId, spoutId); super.fail(msgId); HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId)); retryList.add(item); http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 2187444..580993b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -37,6 +37,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> extends AbstractFileReader { private static final Logger log = LoggerFactory .getLogger(SequenceFileReader.class); + public static final String[] defaultFields = {"key", "value"}; private static final int DEFAULT_BUFF_SIZE = 4096; public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index 422ff69..641ac74 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -34,6 +34,7 @@ import java.util.Map; // Todo: Track file offsets instead of line number class TextFileReader extends AbstractFileReader { + public static final String[] defaultFields = {"line"}; public static final String CHARSET = "hdfsspout.reader.charset"; public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; http://git-wip-us.apache.org/repos/asf/storm/blob/ac1322fb/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 3b07ba2..cdd4020 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -123,7 +123,7 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1"); - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); runSpout(spout,"r11"); @@ -144,7 +144,7 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1"); conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable acking - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); // consume file 1 runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4"); @@ -167,8 +167,8 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString()); - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); - HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT, TextFileReader.defaultFields); // consume file 1 partially List<String> res = runSpout(spout, "r2"); @@ -214,8 +214,8 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString()); - HdfsSpout spout = makeSpout(0, conf, Configs.SEQ); - HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ); + HdfsSpout spout = makeSpout(0, conf, Configs.SEQ, SequenceFileReader.defaultFields); + HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ, SequenceFileReader.defaultFields); // consume file 1 partially List<String> res = runSpout(spout, "r2"); @@ -329,7 +329,7 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1"); conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); // read few lines from file1 dont ack runSpout(spout, "r3"); @@ -405,7 +405,7 @@ public class TestHdfsSpout { createSeqFile(fs, file2, 5); Map conf = getDefaultConfig(); - HdfsSpout spout = makeSpout(0, conf, Configs.SEQ); + HdfsSpout spout = makeSpout(0, conf, Configs.SEQ, SequenceFileReader.defaultFields); // consume both files List<String> res = runSpout(spout, "r11"); @@ -432,7 +432,7 @@ public class TestHdfsSpout { // 2) run spout Map conf = getDefaultConfig(); - HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName()); + HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields); List<String> res = runSpout(spout, "r11"); String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 0]","[line 1]","[line 2]"}; Assert.assertArrayEquals(expected, res.toArray()); @@ -453,7 +453,7 @@ public class TestHdfsSpout { Map conf = getDefaultConfig(); conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "100"); // make it irrelvant - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); // 1) read initial lines in file, then check if lock exists List<String> res = runSpout(spout, "r5"); @@ -500,7 +500,7 @@ public class TestHdfsSpout { Map conf = getDefaultConfig(); conf.put(Configs.COMMIT_FREQ_COUNT, "2"); // 1 lock log entry every 2 tuples conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // make it irrelevant for this test - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); // 1) read 5 lines in file, runSpout(spout, "r5"); @@ -526,7 +526,7 @@ public class TestHdfsSpout { conf.put(Configs.COMMIT_FREQ_COUNT, "0"); // disable it conf.put(Configs.COMMIT_FREQ_SEC, "2"); // log every 2 sec - HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT, TextFileReader.defaultFields); // 1) read 5 lines in file runSpout(spout, "r5"); @@ -559,16 +559,17 @@ public class TestHdfsSpout { conf.put(Configs.SOURCE_DIR, source.toString()); conf.put(Configs.ARCHIVE_DIR, archive.toString()); conf.put(Configs.BAD_DIR, badfiles.toString()); - conf.put("filesystem", fs); + conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString()); return conf; } - private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) { - HdfsSpout spout = new HdfsSpout().withOutputFields("line"); + private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType, String[] outputFields) { + HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields); MockCollector collector = new MockCollector(); conf.put(Configs.READER_TYPE, readerType); spout.open(conf, new MockTopologyContext(spoutId), collector); + conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString()); return spout; } @@ -687,6 +688,7 @@ public class TestHdfsSpout { // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read static class MockTextFailingReader extends TextFileReader { + public static final String[] defaultFields = {"line"}; int readAttempts = 0; public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException {
