Adding docs in README.md, Fixed bugs in instantiating Spout (found in Sys Testing), Added sample topology under hdfs-starter. Renamed logger from LOG to log
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a6fed4c6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a6fed4c6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a6fed4c6 Branch: refs/heads/1.x-branch Commit: a6fed4c6f8fc973651d1e8e36de78e0a2f8b7c0d Parents: e50b639 Author: Roshan Naik <[email protected]> Authored: Tue Dec 29 01:42:58 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 5 + .../jvm/storm/starter/HdfsSpoutTopology.java | 126 +++++++++++++++++ external/storm-hdfs/README.md | 58 +++++++- .../storm/hdfs/spout/AbstractFileReader.java | 13 +- .../org/apache/storm/hdfs/spout/Configs.java | 16 +-- .../org/apache/storm/hdfs/spout/FileOffset.java | 2 +- .../org/apache/storm/hdfs/spout/FileReader.java | 12 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 136 ++++++++++--------- .../storm/hdfs/spout/SequenceFileReader.java | 42 +----- .../apache/storm/hdfs/spout/TextFileReader.java | 8 +- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 2 +- 11 files changed, 282 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 8d6752d..1a7644a 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -135,6 +135,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hdfs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java new file mode 100644 index 0000000..45a6aaf --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.Nimbus; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import org.apache.storm.hdfs.bolt.HdfsBolt; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.spout.Configs; +import org.apache.storm.hdfs.spout.HdfsSpout; + +import java.util.Map; + + +public class HdfsSpoutTopology { + + public static final String SPOUT_ID = "hdfsspout"; + public static final String BOLT_ID = "hdfsbolt"; + + public static final int SPOUT_NUM = 4; + public static final int BOLT_NUM = 4; + public static final int WORKER_NUM = 4; + + + private static HdfsBolt makeHdfsBolt(String arg, String destinationDir) { + DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat() + .withPath(destinationDir) + .withExtension(".txt"); + RecordFormat format = new DelimitedRecordFormat(); + FileRotationPolicy rotationPolicy = new TimedRotationPolicy(5.0f, TimedRotationPolicy.TimeUnit.MINUTES); + + return new HdfsBolt() + .withConfigKey("hdfs.config") + .withFsUrl(arg) + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(new CountSyncPolicy(1000)); + } + + /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming + * args: sourceDir sourceArchiveDir badDir destinationDir + */ + public static void main(String[] args) throws Exception { + // 0 - validate args + if (args.length < 6) { + System.err.println("Please check command line arguments."); + System.err.println("Usage :"); + System.err.println(HdfsSpoutTopology.class.toString() + " topologyName fileFormat sourceDir sourceArchiveDir badDir destinationDir."); + System.err.println(" topologyName - topology name."); + System.err.println(" fileFormat - Set to 'TEXT' for reading text files or 'SEQ' for sequence files."); + System.err.println(" sourceDir - read files from this HDFS dir using HdfsSpout."); + System.err.println(" sourceArchiveDir - after a file in sourceDir is read completely, it is moved to this HDFS location."); + System.err.println(" badDir - files that cannot be read properly will be moved to this HDFS location."); + System.err.println(" destinationDir - write data out to this HDFS location using HDFS bolt."); + + System.err.println(); + System.exit(-1); + } + + // 1 - parse cmd line args + String topologyName = args[0]; + String fileFormat = args[1]; + String sourceDir = args[2]; + String sourceArchiveDir = args[3]; + String badDir = args[4]; + String destinationDir = args[5]; + + // 2 - create and configure spout and bolt + HdfsBolt bolt = makeHdfsBolt(args[0], destinationDir); + HdfsSpout spout = new HdfsSpout().withOutputFields("line"); + + Config conf = new Config(); + conf.put(Configs.SOURCE_DIR, sourceDir); + conf.put(Configs.ARCHIVE_DIR, sourceArchiveDir); + conf.put(Configs.BAD_DIR, badDir); + conf.put(Configs.READER_TYPE, fileFormat); + + // 3 - Create and configure topology + conf.setDebug(true); + conf.setNumWorkers(WORKER_NUM); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(SPOUT_ID, spout, SPOUT_NUM); + builder.setBolt(BOLT_ID, bolt, BOLT_NUM).shuffleGrouping(SPOUT_ID); + + // 4 - submit topology, wait for few min and terminate it + Map clusterConf = Utils.readStormConfig(); + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + // 5 - Print metrics every 30 sec, kill topology after 5 min + for (int i = 0; i < 10; i++) { + Thread.sleep(30 * 1000); + FastWordCountTopology.printMetrics(client, topologyName); + } + FastWordCountTopology.kill(client, topologyName); + } // main + +} http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/README.md ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index 450a8f5..3a64ae6 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -1,9 +1,58 @@ # Storm HDFS Storm components for interacting with HDFS file systems + - HDFS Bolt + - HDFS Spout + +# HDFS Spout ## Usage + +The following example creates an HDFS spout that reads text files from HDFS path hdfs://localhost:54310/source. + +```java +// Instantiate spout +HdfsSpout textReaderSpout = new HdfsSpout().withOutputFields("line"); +// HdfsSpout seqFileReaderSpout = new HdfsSpout().withOutputFields("key","value"); + +// Configure it +Config conf = new Config(); +conf.put(Configs.SOURCE_DIR, "hdfs://localhost:54310/source"); +conf.put(Configs.ARCHIVE_DIR, "hdfs://localhost:54310/done"); +conf.put(Configs.BAD_DIR, "hdfs://localhost:54310/badfiles"); +conf.put(Configs.READER_TYPE, "text"); // or 'seq' for sequence files + +// Create & configure topology +TopologyBuilder builder = new TopologyBuilder(); +builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM); + +// Setup bolts and other topology configuration + ..snip.. + +// Submit topology with config +StormSubmitter.submitTopologyWithProgressBar("topologyName", conf, builder.createTopology()); +``` + +## HDFS Spout Configuration Settings + +| Setting | Default | Description | +|--------------------------|-------------|-------------| +|**hdfsspout.reader.type** | | Indicates the reader for the file format. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom type (that implements interface org.apache.storm.hdfs.spout.FileReader)| +|**hdfsspout.source.dir** | | HDFS location from where to read. E.g. hdfs://localhost:54310/inputfiles | +|**hdfsspout.archive.dir** | | After a file is processed completely it will be moved to this directory. E.g. hdfs://localhost:54310/done| +|**hdfsspout.badfiles.dir**| | if there is an error parsing a file's contents, the file is moved to this location. E.g. hdfs://localhost:54310/badfiles | +|hdfsspout.ignore.suffix | .ignore | File names with this suffix in the in the hdfsspout.source.dir location will not be processed| +|hdfsspout.lock.dir | '.lock' subdirectory under hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS spout instances synchronize using *lock*. Before processing a file the spout instance creates a lock file in this directory with same name as input file and deletes this lock file after processing the file. Spout also periodically makes a note of its progress (wrt reading the input file) in the lock file so that another spout instance can resume progress on the same file if the spout dies for any reason.| +|hdfsspout.commit.count | 20000 | Record progress in the lock file after these many records are processed. If set to 0, this criterion will not be used. | +|hdfsspout.commit.sec | 10 | Record progress in the lock file after these many seconds have elapsed. Must be greater than 0 | +|hdfsspout.max.outstanding | 10000 | Limits the number of unACKed tuples by pausing tuple generation (if ACKers are used in the topology) | +|hdfsspout.lock.timeout.sec| 5 minutes | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership | +|hdfsspout.clocks.insync | true | Indicates whether clocks on the storm machines are in sync (using services like NTP) | + + +# HDFS Bolt +## Usage The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every 1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they reach 5 megabytes in size. @@ -30,6 +79,7 @@ HdfsBolt bolt = new HdfsBolt() .withSyncPolicy(syncPolicy); ``` + ### Packaging a Topology When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the [maven-assembly-plugin](). @@ -115,7 +165,7 @@ Hadoop client version incompatibilites can manifest as errors like: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero) ``` -## Customization +## HDFS Bolt Customization ### Record Formats Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat` @@ -236,7 +286,7 @@ If you are using Trident and sequence files you can do something like this: ``` -## Support for HDFS Sequence Files +## HDFS Bolt Support for HDFS Sequence Files The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files: @@ -277,7 +327,7 @@ public interface SequenceFormat extends Serializable { } ``` -## Support for Avro Files +## HDFS Bolt Support for Avro Files The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to write Avro objects directly to HDFS: @@ -310,7 +360,7 @@ An `org.apache.avro.Schema` object cannot be directly provided since it does not The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided schema. -## Trident API +## HDFS Bolt support for Trident API storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors that of the bolts. http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java index 6efea81..9996c6c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java @@ -28,11 +28,10 @@ abstract class AbstractFileReader implements FileReader { private final Path file; private Fields fields; - public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) { + public AbstractFileReader(FileSystem fs, Path file) { if (fs == null || file == null) throw new IllegalArgumentException("file and filesystem args cannot be null"); this.file = file; - this.fields = fieldNames; } @Override @@ -42,16 +41,6 @@ abstract class AbstractFileReader implements FileReader { @Override - public Fields getOutputFields() { - return fields; - } - - @Override - public void setFields(String... fieldNames) { - this.fields = new Fields(fieldNames); - } - - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java index 93d775b..00db8eb 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -19,25 +19,25 @@ package org.apache.storm.hdfs.spout; public class Configs { - public static final String READER_TYPE = "hdfsspout.reader.type"; + public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed public static final String TEXT = "text"; public static final String SEQ = "seq"; - public static final String SOURCE_DIR = "hdfsspout.source.dir"; // dir from which to read files - public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // completed files will be moved here - public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // unpraseable files will be moved here + public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files + public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here + public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this. public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled. - public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate"; + public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding"; public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync - public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix will be ignored by the Spout + public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix in archive dir will be ignored by the Spout public static final String DEFAULT_LOCK_DIR = ".lock"; - public static final int DEFAULT_COMMIT_FREQ_COUNT = 10000; + public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000; public static final int DEFAULT_COMMIT_FREQ_SEC = 10; - public static final int DEFAULT_MAX_DUPLICATES = 100; + public static final int DEFAULT_MAX_OUTSTANDING = 10000; public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config"; http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java index ea8c1e1..ad48779 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java @@ -32,5 +32,5 @@ package org.apache.storm.hdfs.spout; interface FileOffset extends Comparable<FileOffset>, Cloneable { /** tests if rhs == currOffset+1 */ boolean isNextOffset(FileOffset rhs); - public FileOffset clone(); + FileOffset clone(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java index 78284cf..1cb1f59 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java @@ -25,13 +25,13 @@ import java.io.IOException; import java.util.List; interface FileReader { - public Path getFilePath(); + Path getFilePath(); /** * A simple numeric value may not be sufficient for certain formats consequently * this is a String. */ - public FileOffset getFileOffset(); + FileOffset getFileOffset(); /** * Get the next tuple from the file @@ -39,11 +39,7 @@ interface FileReader { * @return null if no more data * @throws IOException */ - public List<Object> next() throws IOException, ParseException; + List<Object> next() throws IOException, ParseException; - public Fields getOutputFields(); - - public void setFields(String... fieldNames); - - public void close(); + void close(); } http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index fdb48b4..0e172a9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -46,20 +46,27 @@ import backtype.storm.tuple.Fields; public class HdfsSpout extends BaseRichSpout { - private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class); - - private Path sourceDirPath; - private Path archiveDirPath; - private Path badFilesDirPath; + // user configurable + private String readerType; // required + private Fields outputFields; // required + private Path sourceDirPath; // required + private Path archiveDirPath; // required + private Path badFilesDirPath; // required private Path lockDirPath; private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT; private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC; - private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES; + private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING; private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT; private boolean clocksInSync = true; - private ProgressTracker tracker = new ProgressTracker(); + private String inprogress_suffix = ".inprogress"; + private String ignoreSuffix = ".ignore"; + + // other members + private static final Logger log = LoggerFactory.getLogger(HdfsSpout.class); + + private ProgressTracker tracker = null; private FileSystem hdfs; private FileReader reader; @@ -68,11 +75,7 @@ public class HdfsSpout extends BaseRichSpout { HashMap<MessageId, List<Object> > inflight = new HashMap<>(); LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>(); - private String inprogress_suffix = ".inprogress"; - private String ignoreSuffix = ".ignore"; - private Configuration hdfsConfig; - private String readerType; private Map conf = null; private FileLock lock; @@ -85,13 +88,18 @@ public class HdfsSpout extends BaseRichSpout { private boolean ackEnabled = false; private int acksSinceLastCommit = 0 ; private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); - private final Timer commitTimer = new Timer(); + private Timer commitTimer; private boolean fileReadCompletely = true; - private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs + private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs public HdfsSpout() { } + /** Name of the output field names. Number of fields depends upon the reader type */ + public HdfsSpout withOutputFields(String... fields) { + outputFields = new Fields(fields); + return this; + } public Path getLockDirPath() { return lockDirPath; @@ -101,25 +109,27 @@ public class HdfsSpout extends BaseRichSpout { return collector; } + /** config key under which HDFS options are placed. (similar to HDFS bolt). + * default key name is 'hdfs.config' */ public HdfsSpout withConfigKey(String configKey){ this.configKey = configKey; return this; } public void nextTuple() { - LOG.debug("Next Tuple {}", spoutId); + log.debug("Next Tuple {}", spoutId); // 1) First re-emit any previously failed tuples (from retryList) if (!retryList.isEmpty()) { - LOG.debug("Sending from retry list"); + log.debug("Sending from retry list"); HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove(); emitData(pair.getValue(), pair.getKey()); return; } - if( ackEnabled && tracker.size()>=maxDuplicates ) { - LOG.warn("Waiting for more ACKs before generating new tuples. " + - "Progress tracker size has reached limit {}, SpoutID {}" - , maxDuplicates, spoutId); + if( ackEnabled && tracker.size()>= maxOutstanding) { + log.warn("Waiting for more ACKs before generating new tuples. " + + "Progress tracker size has reached limit {}, SpoutID {}" + , maxOutstanding, spoutId); // Don't emit anything .. allow configured spout wait strategy to kick in return; } @@ -131,7 +141,7 @@ public class HdfsSpout extends BaseRichSpout { if (reader == null) { reader = pickNextFile(); if (reader == null) { - LOG.debug("Currently no new files to process under : " + sourceDirPath); + log.debug("Currently no new files to process under : " + sourceDirPath); return; } else { fileReadCompletely=false; @@ -162,11 +172,11 @@ public class HdfsSpout extends BaseRichSpout { } } } catch (IOException e) { - LOG.error("I/O Error processing at file location " + getFileProgress(reader), e); + log.error("I/O Error processing at file location " + getFileProgress(reader), e); // don't emit anything .. allow configured spout wait strategy to kick in return; } catch (ParseException e) { - LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + + log.error("Parsing error when processing at file location " + getFileProgress(reader) + ". Skipping remainder of file.", e); markFileAsBad(reader.getFilePath()); // Note: We don't return from this method on ParseException to avoid triggering the @@ -187,7 +197,7 @@ public class HdfsSpout extends BaseRichSpout { commitTimeElapsed.set(false); setupCommitElapseTimer(); } catch (IOException e) { - LOG.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); + log.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); } } } @@ -212,9 +222,9 @@ public class HdfsSpout extends BaseRichSpout { private void markFileAsDone(Path filePath) { try { Path newFile = renameCompletedFile(reader.getFilePath()); - LOG.info("Completed processing {}. Spout Id = {} ", newFile, spoutId); + log.info("Completed processing {}. Spout Id = {} ", newFile, spoutId); } catch (IOException e) { - LOG.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); + log.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); } closeReaderAndResetTrackers(); } @@ -225,13 +235,13 @@ public class HdfsSpout extends BaseRichSpout { String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); - LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); + log.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); try { if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception } } catch (IOException e) { - LOG.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); + log.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); } closeReaderAndResetTrackers(); } @@ -245,23 +255,25 @@ public class HdfsSpout extends BaseRichSpout { reader = null; try { lock.release(); - LOG.debug("Spout {} released FileLock. SpoutId = {}", lock.getLockFile(), spoutId); + log.debug("Spout {} released FileLock. SpoutId = {}", lock.getLockFile(), spoutId); } catch (IOException e) { - LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " SpoutId =" + spoutId, e); + log.error("Unable to delete lock file : " + this.lock.getLockFile() + " SpoutId =" + spoutId, e); } lock = null; } protected void emitData(List<Object> tuple, MessageId id) { - LOG.debug("Emitting - {}", id); + log.debug("Emitting - {}", id); this.collector.emit(tuple, id); inflight.put(id, tuple); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; + this.commitTimer = new Timer(); + this.tracker = new ProgressTracker(); final String FILE_SYSTEM = "filesystem"; - LOG.info("Opening HDFS Spout {}", spoutId); + log.info("Opening HDFS Spout {}", spoutId); this.collector = collector; this.hdfsConfig = new Configuration(); this.tupleCounter = 0; @@ -270,7 +282,7 @@ public class HdfsSpout extends BaseRichSpout { String key = k.toString(); if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test only String val = conf.get(key).toString(); - LOG.info("Config setting : " + key + " = " + val); + log.info("Config setting : " + key + " = " + val); this.hdfsConfig.set(key, val); } else @@ -294,20 +306,20 @@ public class HdfsSpout extends BaseRichSpout { try { HdfsSecurityUtil.login(conf, hdfsConfig); } catch (IOException e) { - LOG.error("Failed to open " + sourceDirPath); + log.error("Failed to open " + sourceDirPath); throw new RuntimeException(e); } // -- source dir config if ( !conf.containsKey(Configs.SOURCE_DIR) ) { - LOG.error(Configs.SOURCE_DIR + " setting is required"); + log.error(Configs.SOURCE_DIR + " setting is required"); throw new RuntimeException(Configs.SOURCE_DIR + " setting is required"); } this.sourceDirPath = new Path( conf.get(Configs.SOURCE_DIR).toString() ); // -- archive dir config if ( !conf.containsKey(Configs.ARCHIVE_DIR) ) { - LOG.error(Configs.ARCHIVE_DIR + " setting is required"); + log.error(Configs.ARCHIVE_DIR + " setting is required"); throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required"); } this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() ); @@ -315,14 +327,14 @@ public class HdfsSpout extends BaseRichSpout { // -- bad files dir config if ( !conf.containsKey(Configs.BAD_DIR) ) { - LOG.error(Configs.BAD_DIR + " setting is required"); + log.error(Configs.BAD_DIR + " setting is required"); throw new RuntimeException(Configs.BAD_DIR + " setting is required"); } this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString()); validateOrMakeDir(hdfs, badFilesDirPath, "bad files"); - // -- ignore filename suffix + // -- ignore file names config if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) { this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString(); } @@ -348,12 +360,15 @@ public class HdfsSpout extends BaseRichSpout { commitFrequencyCount = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_COUNT).toString() ); // -- commit frequency - seconds - if( conf.get(Configs.COMMIT_FREQ_SEC) != null ) - commitFrequencySec = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_SEC).toString() ); + if( conf.get(Configs.COMMIT_FREQ_SEC) != null ) { + commitFrequencySec = Integer.parseInt(conf.get(Configs.COMMIT_FREQ_SEC).toString()); + if(commitFrequencySec<=0) + throw new RuntimeException(Configs.COMMIT_FREQ_SEC + " setting must be greater than 0"); + } // -- max duplicate - if( conf.get(Configs.MAX_DUPLICATE) !=null ) - maxDuplicates = Integer.parseInt( conf.get(Configs.MAX_DUPLICATE).toString() ); + if( conf.get(Configs.MAX_OUTSTANDING) !=null ) + maxOutstanding = Integer.parseInt( conf.get(Configs.MAX_OUTSTANDING).toString() ); // -- clocks in sync if( conf.get(Configs.CLOCKS_INSYNC) !=null ) @@ -370,15 +385,15 @@ public class HdfsSpout extends BaseRichSpout { try { if(fs.exists(dir)) { if(! fs.isDirectory(dir) ) { - LOG.error(dirDescription + " directory is a file, not a dir. " + dir); + log.error(dirDescription + " directory is a file, not a dir. " + dir); throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); } } else if(! fs.mkdirs(dir) ) { - LOG.error("Unable to create " + dirDescription + " directory " + dir); + log.error("Unable to create " + dirDescription + " directory " + dir); throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); } } catch (IOException e) { - LOG.error("Unable to create " + dirDescription + " directory " + dir, e); + log.error("Unable to create " + dirDescription + " directory " + dir, e); throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); } } @@ -395,10 +410,10 @@ public class HdfsSpout extends BaseRichSpout { classType.getConstructor(FileSystem.class, Path.class, Map.class); return; } catch (ClassNotFoundException e) { - LOG.error(readerType + " not found in classpath.", e); + log.error(readerType + " not found in classpath.", e); throw new IllegalArgumentException(readerType + " not found in classpath.", e); } catch (NoSuchMethodException e) { - LOG.error(readerType + " is missing the expected constructor for Readers.", e); + log.error(readerType + " is missing the expected constructor for Readers.", e); throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); } } @@ -438,10 +453,10 @@ public class HdfsSpout extends BaseRichSpout { // 1) If there are any abandoned files, pick oldest one lock = getOldestExpiredLock(); if (lock != null) { - LOG.debug("Spout {} now took over ownership of abandoned FileLock {}" , spoutId, lock.getLockFile()); + log.debug("Spout {} now took over ownership of abandoned FileLock {}", spoutId, lock.getLockFile()); Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); String resumeFromOffset = lock.getLastLogEntry().fileOffset; - LOG.info("Resuming processing of abandoned file : {}", file); + log.info("Resuming processing of abandoned file : {}", file); return createFileReader(file, resumeFromOffset); } @@ -456,17 +471,17 @@ public class HdfsSpout extends BaseRichSpout { lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); if( lock==null ) { - LOG.debug("Unable to get FileLock, so skipping file: {}", file); + log.debug("Unable to get FileLock, so skipping file: {}", file); continue; // could not lock, so try another file. } - LOG.info("Processing : {} ", file); + log.info("Processing : {} ", file); Path newFile = renameSelectedFile(file); return createFileReader(newFile); } return null; } catch (IOException e) { - LOG.error("Unable to select next file for consumption " + sourceDirPath, e); + log.error("Unable to select next file for consumption " + sourceDirPath, e); return null; } } @@ -483,12 +498,12 @@ public class HdfsSpout extends BaseRichSpout { if (dirlock == null) { dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec); if (dirlock == null) { - LOG.debug("Spout {} could not take over ownership of DirLock for {}" , spoutId, lockDirPath); + log.debug("Spout {} could not take over ownership of DirLock for {}", spoutId, lockDirPath); return null; } - LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}" , spoutId, lockDirPath); + log.debug("Spout {} now took over ownership of abandoned DirLock for {}", spoutId, lockDirPath); } else { - LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath); + log.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath); } try { @@ -520,7 +535,7 @@ public class HdfsSpout extends BaseRichSpout { } } finally { dirlock.release(); - LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), spoutId); + log.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), spoutId); } } @@ -546,7 +561,7 @@ public class HdfsSpout extends BaseRichSpout { Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { - LOG.error(e.getMessage(), e); + log.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType, e); } } @@ -571,7 +586,7 @@ public class HdfsSpout extends BaseRichSpout { Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); } catch (Exception e) { - LOG.error(e.getMessage(), e); + log.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType, e); } } @@ -609,17 +624,16 @@ public class HdfsSpout extends BaseRichSpout { String newName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName ); - LOG.debug("Renaming complete file to {} ", newFile); - LOG.info("Completed file {}", fileNameMinusSuffix ); + log.info("Completed consuming file {}", fileNameMinusSuffix); if (!hdfs.rename(file, newFile) ) { throw new IOException("Rename failed for file: " + file); } + log.debug("Renamed completed file {} to {} ", file, newFile); return newFile; } public void declareOutputFields(OutputFieldsDeclarer declarer) { - Fields fields = reader.getOutputFields(); - declarer.declare(fields); + declarer.declare(outputFields); } static class MessageId implements Comparable<MessageId> { http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 308d1c6..2187444 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -33,10 +33,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; -// Todo: Track file offsets instead of line number public class SequenceFileReader<Key extends Writable,Value extends Writable> extends AbstractFileReader { - private static final Logger LOG = LoggerFactory + private static final Logger log = LoggerFactory .getLogger(SequenceFileReader.class); private static final int DEFAULT_BUFF_SIZE = 4096; public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; @@ -45,12 +44,6 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> private final SequenceFileReader.Offset offset; - private static final String DEFAULT_KEYNAME = "key"; - private static final String DEFAULT_VALNAME = "value"; - - private String keyName; - private String valueName; - private final Key key; private final Value value; @@ -58,9 +51,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> public SequenceFileReader(FileSystem fs, Path file, Map conf) throws IOException { - super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME)); - this.keyName = DEFAULT_KEYNAME; - this.valueName = DEFAULT_VALNAME; + super(fs, file); int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); @@ -70,9 +61,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset) throws IOException { - super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME)); - this.keyName = DEFAULT_KEYNAME; - this.valueName = DEFAULT_VALNAME; + super(fs, file); int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); this.offset = new SequenceFileReader.Offset(offset); this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); @@ -88,29 +77,6 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> } } - public String getKeyName() { - return keyName; - } - - public void setKeyName(String name) { - if (name == null) - throw new IllegalArgumentException("keyName cannot be null"); - this.keyName = name; - setFields(keyName, valueName); - - } - - public String getValueName() { - return valueName; - } - - public void setValueName(String name) { - if (name == null) - throw new IllegalArgumentException("valueName cannot be null"); - this.valueName = name; - setFields(keyName, valueName); - } - public List<Object> next() throws IOException, ParseException { if( reader.next(key, value) ) { ArrayList<Object> result = new ArrayList<Object>(2); @@ -126,7 +92,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> try { reader.close(); } catch (IOException e) { - LOG.warn("Ignoring error when closing file " + getFilePath(), e); + log.warn("Ignoring error when closing file " + getFilePath(), e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index fdea42a..422ff69 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -37,12 +37,10 @@ class TextFileReader extends AbstractFileReader { public static final String CHARSET = "hdfsspout.reader.charset"; public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes"; - public static final String DEFAULT_FIELD_NAME = "line"; - private static final int DEFAULT_BUFF_SIZE = 4096; private BufferedReader reader; - private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class); + private final Logger log = LoggerFactory.getLogger(TextFileReader.class); private TextFileReader.Offset offset; public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException { @@ -55,7 +53,7 @@ class TextFileReader extends AbstractFileReader { private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) throws IOException { - super(fs, file, new Fields(DEFAULT_FIELD_NAME)); + super(fs, file); offset = startOffset; FSDataInputStream in = fs.open(file); @@ -102,7 +100,7 @@ class TextFileReader extends AbstractFileReader { try { reader.close(); } catch (IOException e) { - LOG.warn("Ignoring error when closing file " + getFilePath(), e); + log.warn("Ignoring error when closing file " + getFilePath(), e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/a6fed4c6/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 203a63b..3b07ba2 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -565,7 +565,7 @@ public class TestHdfsSpout { private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) { - HdfsSpout spout = new HdfsSpout(); + HdfsSpout spout = new HdfsSpout().withOutputFields("line"); MockCollector collector = new MockCollector(); conf.put(Configs.READER_TYPE, readerType); spout.open(conf, new MockTopologyContext(spoutId), collector);
